darkfid: better locks handling and minor code improvements

This commit is contained in:
skoupidi
2024-02-29 14:54:19 +02:00
parent dc6f4ecd6c
commit d7495510dd
7 changed files with 134 additions and 106 deletions

View File

@@ -123,7 +123,7 @@ impl ProtocolProposal {
let proposal_copy = (*proposal).clone();
match self.validator.consensus.append_proposal(&proposal_copy.0).await {
match self.validator.append_proposal(&proposal_copy.0).await {
Ok(()) => {
self.p2p.broadcast_with_exclude(&proposal_copy, &exclude_list).await;
if let Some(sync_p2p) = self.sync_p2p.as_ref() {
@@ -189,7 +189,12 @@ impl ProtocolProposal {
}
for proposal in &response.proposals {
self.validator.consensus.append_proposal(proposal).await?;
self.validator.append_proposal(proposal).await?;
let message = ProposalMessage(proposal.clone());
self.p2p.broadcast_with_exclude(&message, &exclude_list).await;
if let Some(sync_p2p) = self.sync_p2p.as_ref() {
sync_p2p.broadcast_with_exclude(&message, &exclude_list).await;
}
// Notify subscriber
let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
self.subscriber.notify(vec![enc_prop].into()).await;

View File

@@ -109,16 +109,15 @@ pub async fn miner_task(node: &Darkfid, recipient: &PublicKey, skip_sync: bool)
// Start miner loop
loop {
// Grab best current fork proposals sequence
// Grab best current fork
let forks = node.validator.consensus.forks.read().await;
let fork_index = best_forks_indexes(&forks)?[0];
let fork_proposals = forks[fork_index].proposals.clone();
let extended_fork = forks[best_forks_indexes(&forks)?[0]].full_clone()?;
drop(forks);
// Start listenning for network proposals and mining next block for best fork.
smol::future::or(
listen_to_network(node, fork_proposals, &subscription),
mine_next_block(node, fork_index, &mut secret, recipient, &zkbin, &pk),
listen_to_network(node, &extended_fork, &subscription),
mine_next_block(node, &extended_fork, &mut secret, recipient, &zkbin, &pk),
)
.await?;
@@ -137,10 +136,12 @@ pub async fn miner_task(node: &Darkfid, recipient: &PublicKey, skip_sync: bool)
/// Auxiliary function to listen for incoming proposals and check if the best fork has changed
async fn listen_to_network(
node: &Darkfid,
fork_proposals: Vec<blake3::Hash>,
extended_fork: &Fork,
subscription: &Subscription<JsonNotification>,
) -> Result<()> {
'main_loop: loop {
// Grab extended fork last proposal hash
let last_proposal_hash = extended_fork.last_proposal()?.hash;
loop {
// Wait until a new proposal has been received
subscription.receive().await;
@@ -152,21 +153,18 @@ async fn listen_to_network(
// Iterate to verify if proposals sequence has changed
for index in fork_indexes {
if forks[index].proposals == fork_proposals {
if forks[index].last_proposal()?.hash != last_proposal_hash {
drop(forks);
continue 'main_loop
return Ok(())
}
}
drop(forks);
return Ok(())
}
}
/// Auxiliary function to generate and mine provided fork index next block
async fn mine_next_block(
node: &Darkfid,
fork_index: usize,
extended_fork: &Fork,
secret: &mut SecretKey,
recipient: &PublicKey,
zkbin: &ZkBinary,
@@ -174,7 +172,7 @@ async fn mine_next_block(
) -> Result<()> {
// Grab next target and block
let (next_target, mut next_block) =
generate_next_block(node, fork_index, secret, recipient, zkbin, pk).await?;
generate_next_block(extended_fork, secret, recipient, zkbin, pk).await?;
// Execute request to minerd and parse response
let target = JsonValue::String(next_target.to_string());
@@ -186,11 +184,11 @@ async fn mine_next_block(
next_block.sign(secret)?;
// Verify it
node.validator.consensus.module.read().await.verify_current_block(&next_block)?;
extended_fork.module.verify_current_block(&next_block)?;
// Append the mined block as a proposal
let proposal = Proposal::new(next_block)?;
node.validator.consensus.append_proposal(&proposal).await?;
node.validator.append_proposal(&proposal).await?;
// Broadcast proposal to the network
let message = ProposalMessage(proposal);
@@ -202,52 +200,42 @@ async fn mine_next_block(
/// Auxiliary function to generate next block in an atomic manner
async fn generate_next_block(
node: &Darkfid,
fork_index: usize,
extended_fork: &Fork,
secret: &mut SecretKey,
recipient: &PublicKey,
zkbin: &ZkBinary,
pk: &ProvingKey,
) -> Result<(BigUint, BlockInfo)> {
// Grab a lock over nodes' current forks
let forks = node.validator.consensus.forks.read().await;
// Grab extended fork last proposal hash
let last_proposal = extended_fork.last_proposal()?;
// Grab best current fork
let fork = &forks[fork_index];
// Generate new signing key for next block
let next_block_height = fork.get_next_block_height()?;
// We are deriving the next secret key for optimization.
// Next secret is the poseidon hash of:
// [prefix, current(previous) secret, signing(block) height].
let prefix = pallas::Base::from_raw([4, 0, 0, 0]);
let next_secret = poseidon_hash([prefix, secret.inner(), next_block_height.into()]);
let next_secret =
poseidon_hash([prefix, secret.inner(), (last_proposal.block.header.height + 1).into()]);
*secret = SecretKey::from(next_secret);
// Generate reward transaction
let tx = generate_transaction(fork, secret, recipient, zkbin, pk, next_block_height)?;
let tx = generate_transaction(&extended_fork.last_proposal()?, secret, recipient, zkbin, pk)?;
// Generate next block proposal
let target = fork.module.next_mine_target()?;
let next_block = node.validator.consensus.generate_unsigned_block(fork, tx).await?;
// Drop forks lock
drop(forks);
let target = extended_fork.module.next_mine_target()?;
let next_block = extended_fork.generate_unsigned_block(tx).await?;
Ok((target, next_block))
}
/// Auxiliary function to generate a Money::PoWReward transaction
fn generate_transaction(
fork: &Fork,
last_proposal: &Proposal,
secret: &SecretKey,
recipient: &PublicKey,
zkbin: &ZkBinary,
pk: &ProvingKey,
block_height: u64,
) -> Result<Transaction> {
// Grab extended proposal info
let last_proposal = fork.last_proposal()?;
let last_nonce = last_proposal.block.header.nonce;
let fork_previous_hash = last_proposal.block.header.previous;
@@ -259,7 +247,7 @@ fn generate_transaction(
let debris = PoWRewardCallBuilder {
secret: *secret,
recipient: *recipient,
block_height,
block_height: last_proposal.block.header.height + 1,
last_nonce,
fork_previous_hash,
spend_hook,

View File

@@ -134,7 +134,7 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> {
// Verify and store retrieved proposals
debug!(target: "darkfid::task::sync_task", "Processing received proposals");
for proposal in &response.proposals {
node.validator.consensus.append_proposal(proposal).await?;
node.validator.append_proposal(proposal).await?;
// Notify subscriber
let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
proposal_notif_sub.notify(vec![enc_prop].into()).await;

View File

@@ -34,7 +34,7 @@ fn forks() -> Result<()> {
let module = PoWModule::new(blockchain.clone(), 90, None)?;
// Create a fork
let fork = Fork::new(&blockchain, module).await?;
let fork = Fork::new(blockchain.clone(), module).await?;
// Add a dummy record to fork
fork.overlay.lock().unwrap().order.insert(&[0], &[record0])?;

View File

@@ -176,7 +176,7 @@ impl Harness {
// and then we broadcast it to rest nodes
for block in blocks {
let proposal = Proposal::new(block.clone())?;
self.alice.validator.consensus.append_proposal(&proposal).await?;
self.alice.validator.append_proposal(&proposal).await?;
let message = ProposalMessage(proposal);
self.alice.miners_p2p.as_ref().unwrap().broadcast(&message).await;
}

View File

@@ -48,6 +48,8 @@ pub struct Consensus {
pub forks: RwLock<Vec<Fork>>,
/// Canonical blockchain PoW module state
pub module: RwLock<PoWModule>,
/// Lock to restrict when proposals appends can happen
pub append_lock: RwLock<()>,
}
impl Consensus {
@@ -58,66 +60,18 @@ impl Consensus {
pow_target: usize,
pow_fixed_difficulty: Option<BigUint>,
) -> Result<Self> {
let forks = RwLock::new(vec![]);
let module =
RwLock::new(PoWModule::new(blockchain.clone(), pow_target, pow_fixed_difficulty)?);
Ok(Self { blockchain, finalization_threshold, forks: RwLock::new(vec![]), module })
}
/// Generate an unsigned block for provided fork, containing all
/// pending transactions.
pub async fn generate_unsigned_block(
&self,
fork: &Fork,
producer_tx: Transaction,
) -> Result<BlockInfo> {
// Grab forks' next block height
let next_block_height = fork.get_next_block_height()?;
// Grab forks' unproposed transactions
let mut unproposed_txs = fork.unproposed_txs(&self.blockchain, next_block_height).await?;
unproposed_txs.push(producer_tx);
// Grab forks' last block proposal(previous)
let previous = fork.last_proposal()?;
// Generate the new header
let header =
Header::new(previous.block.hash()?, next_block_height, Timestamp::current_time(), 0);
// Generate the block
let mut block = BlockInfo::new_empty(header);
// Add transactions to the block
block.append_txs(unproposed_txs)?;
Ok(block)
}
/// Generate a block proposal for provided fork, containing all
/// pending transactions. Proposal is signed using provided secret key,
/// which must also have signed the provided proposal transaction.
pub async fn generate_signed_proposal(
&self,
fork: &Fork,
producer_tx: Transaction,
secret_key: &SecretKey,
) -> Result<Proposal> {
let mut block = self.generate_unsigned_block(fork, producer_tx).await?;
// Sign block
block.sign(secret_key)?;
// Generate the block proposal from the block
let proposal = Proposal::new(block)?;
Ok(proposal)
let append_lock = RwLock::new(());
Ok(Self { blockchain, finalization_threshold, forks, module, append_lock })
}
/// Generate a new empty fork.
pub async fn generate_empty_fork(&self) -> Result<()> {
debug!(target: "validator::consensus::generate_empty_fork", "Generating new empty fork...");
let mut lock = self.forks.write().await;
let fork = Fork::new(&self.blockchain, self.module.read().await.clone()).await?;
let fork = Fork::new(self.blockchain.clone(), self.module.read().await.clone()).await?;
lock.push(fork);
drop(lock);
debug!(target: "validator::consensus::generate_empty_fork", "Fork generated!");
@@ -129,6 +83,18 @@ impl Consensus {
pub async fn append_proposal(&self, proposal: &Proposal) -> Result<()> {
debug!(target: "validator::consensus::append_proposal", "Appending proposal {}", proposal.hash);
// Check if proposal already exists
let lock = self.forks.read().await;
for fork in lock.iter() {
for p in fork.proposals.iter().rev() {
if p == &proposal.hash {
drop(lock);
return Err(Error::ProposalAlreadyExists)
}
}
}
drop(lock);
// Verify proposal and grab corresponding fork
let (mut fork, index) = verify_proposal(self, proposal).await?;
@@ -141,13 +107,6 @@ impl Consensus {
// If a fork index was found, replace forks with the mutated one,
// otherwise push the new fork.
let mut lock = self.forks.write().await;
// Check if fork already exists
for f in lock.iter() {
if f.proposals == fork.proposals {
drop(lock);
return Err(Error::ProposalAlreadyExists)
}
}
match index {
Some(i) => {
if i < lock.len() && lock[i].proposals == fork.proposals[..fork.proposals.len() - 1]
@@ -200,7 +159,7 @@ impl Consensus {
}
// Generate a new fork extending canonical
let fork = Fork::new(&self.blockchain, self.module.read().await.clone()).await?;
let fork = Fork::new(self.blockchain.clone(), self.module.read().await.clone()).await?;
return Ok((fork, None))
}
@@ -212,7 +171,7 @@ impl Consensus {
}
// Rebuild fork
let mut fork = Fork::new(&self.blockchain, self.module.read().await.clone()).await?;
let mut fork = Fork::new(self.blockchain.clone(), self.module.read().await.clone()).await?;
fork.proposals = original_fork.proposals[..p_index + 1].to_vec();
// Retrieve proposals blocks from original fork
@@ -382,6 +341,8 @@ impl From<Proposal> for BlockInfo {
/// the proposals hashes sequence, for validations.
#[derive(Clone)]
pub struct Fork {
/// Canonical (finalized) blockchain
pub blockchain: Blockchain,
/// Overlay cache over canonical Blockchain
pub overlay: BlockchainOverlayPtr,
/// Current PoW module state,
@@ -395,11 +356,62 @@ pub struct Fork {
}
impl Fork {
pub async fn new(blockchain: &Blockchain, module: PoWModule) -> Result<Self> {
pub async fn new(blockchain: Blockchain, module: PoWModule) -> Result<Self> {
let mempool =
blockchain.get_pending_txs()?.iter().map(|tx| blake3::hash(&serialize(tx))).collect();
let overlay = BlockchainOverlay::new(blockchain)?;
Ok(Self { overlay, module, proposals: vec![], mempool, rank: BigUint::from(0u64) })
let overlay = BlockchainOverlay::new(&blockchain)?;
Ok(Self {
blockchain,
overlay,
module,
proposals: vec![],
mempool,
rank: BigUint::from(0u64),
})
}
/// Generate an unsigned block containing all pending transactions.
pub async fn generate_unsigned_block(&self, producer_tx: Transaction) -> Result<BlockInfo> {
// Grab forks' last block proposal(previous)
let previous = self.last_proposal()?;
// Grab forks' next block height
let next_block_height = previous.block.header.height + 1;
// Grab forks' unproposed transactions
let mut unproposed_txs = self.unproposed_txs(&self.blockchain, next_block_height).await?;
unproposed_txs.push(producer_tx);
// Generate the new header
let header =
Header::new(previous.block.hash()?, next_block_height, Timestamp::current_time(), 0);
// Generate the block
let mut block = BlockInfo::new_empty(header);
// Add transactions to the block
block.append_txs(unproposed_txs)?;
Ok(block)
}
/// Generate a block proposal containing all pending transactions.
/// Proposal is signed using provided secret key, which must also
/// have signed the provided proposal transaction.
pub async fn generate_signed_proposal(
&self,
producer_tx: Transaction,
secret_key: &SecretKey,
) -> Result<Proposal> {
let mut block = self.generate_unsigned_block(producer_tx).await?;
// Sign block
block.sign(secret_key)?;
// Generate the block proposal from the block
let proposal = Proposal::new(block)?;
Ok(proposal)
}
/// Auxiliary function to append a proposal and recalculate current fork rank
@@ -532,12 +544,13 @@ impl Fork {
/// Changes to this copy don't affect original fork overlay records, since underlying
/// overlay pointer have been updated to the cloned one.
pub fn full_clone(&self) -> Result<Self> {
let blockchain = self.blockchain.clone();
let overlay = self.overlay.lock().unwrap().full_clone()?;
let module = self.module.clone();
let proposals = self.proposals.clone();
let mempool = self.mempool.clone();
let rank = self.rank.clone();
Ok(Self { overlay, module, proposals, mempool, rank })
Ok(Self { blockchain, overlay, module, proposals, mempool, rank })
}
}

View File

@@ -306,16 +306,35 @@ impl Validator {
Ok(())
}
/// The node locks its consensus state and tries to append provided proposal.
pub async fn append_proposal(&self, proposal: &Proposal) -> Result<()> {
// Grab append lock so we restrict concurrent calls of this function
let append_lock = self.consensus.append_lock.write().await;
// Execute append
self.consensus.append_proposal(proposal).await?;
// Release append lock
drop(append_lock);
Ok(())
}
/// The node checks if proposals can be finalized.
/// If proposals are found, node appends them to canonical, excluding the
/// last one, and rebuild the finalized fork to contain the last one.
pub async fn finalization(&self) -> Result<Vec<BlockInfo>> {
// Grab append lock so no new proposals can be appended while
// we execute finalization
let append_lock = self.consensus.append_lock.write().await;
info!(target: "validator::finalization", "Performing finalization check");
// Grab blocks that can be finalized
let mut finalized = self.consensus.finalization().await?;
if finalized.is_empty() {
info!(target: "validator::finalization", "No proposals can be finalized");
drop(append_lock);
return Ok(vec![])
}
@@ -334,6 +353,9 @@ impl Validator {
self.consensus.append_proposal(&Proposal::new(last)?).await?;
info!(target: "validator::finalization", "Finalization completed!");
// Release append lock
drop(append_lock);
Ok(finalized)
}