Working token transfers.

One bug I've noticed is that the faucet found a duplicate nullifier
upon receiving the block/tx after finalization of the transfer.
This commit is contained in:
parazyd
2022-05-01 17:00:08 +02:00
parent 1dd63d58d2
commit 4aca4b9c62
5 changed files with 62 additions and 44 deletions

View File

@@ -1,5 +1,5 @@
use async_executor::Executor;
use async_std::sync::Arc;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use log::{debug, error, warn};
@@ -27,6 +27,7 @@ pub struct ProtocolSync {
state: ValidatorStatePtr,
p2p: P2pPtr,
consensus_mode: bool,
pending: Mutex<bool>,
}
impl ProtocolSync {
@@ -51,11 +52,12 @@ impl ProtocolSync {
state,
p2p,
consensus_mode,
pending: Mutex::new(false),
}))
}
async fn handle_receive_request(self: Arc<Self>) -> Result<()> {
debug!("ProtocolSync::handle_receive_request() [START]");
debug!("handle_receive_request() [START]");
loop {
let order = match self.request_sub.receive().await {
Ok(v) => v,
@@ -65,7 +67,7 @@ impl ProtocolSync {
}
};
debug!("ProtocolSync::handle_receive_request() received {:?}", order);
debug!("handle_receive_request() received {:?}", order);
// Extra validations can be added here
let key = order.sl;
@@ -76,7 +78,7 @@ impl ProtocolSync {
continue
}
};
debug!("ProtocolSync::handle_receive_request(): Found {} blocks", blocks.len());
debug!("handle_receive_request(): Found {} blocks", blocks.len());
let response = BlockResponse { blocks };
if let Err(e) = self.channel.send(response).await {
@@ -86,7 +88,7 @@ impl ProtocolSync {
}
async fn handle_receive_block(self: Arc<Self>) -> Result<()> {
debug!("ProtocolSync::handle_receive_block() [START]");
debug!("handle_receive_block() [START]");
loop {
let info = match self.block_sub.receive().await {
Ok(v) => v,
@@ -96,7 +98,13 @@ impl ProtocolSync {
}
};
debug!("ProtocolSync::handle_receive_block() received block");
debug!("handle_receive_block() received block");
// We block here if there's a pending validation, otherwise we might
// apply the same block twice.
debug!("handle_receive_block(): Waiting for pending block to apply");
while *self.pending.lock().await {}
debug!("handle_receive_block(): Pending lock released");
// Node stores finalized block, if it doesn't exist (checking by slot),
// and removes its transactions from the unconfirmed_txs vector.
@@ -104,12 +112,14 @@ impl ProtocolSync {
// during proposal finalization.
// Extra validations can be added here.
if !self.consensus_mode {
*self.pending.lock().await = true;
let info_copy = (*info).clone();
let has_block = match self.state.read().await.blockchain.has_block(&info_copy) {
Ok(v) => v,
Err(e) => {
error!("handle_receive_block(): failed checking for has_block(): {}", e);
*self.pending.lock().await = false;
continue
}
};
@@ -124,35 +134,42 @@ impl ProtocolSync {
Ok(v) => v,
Err(e) => {
warn!("handle_receive_block(): State transition fail: {}", e);
*self.pending.lock().await = false;
continue
}
};
debug!("ProtocolSync::handle_receive_block(): All state transitions passed");
debug!("handle_receive_block(): All state transitions passed");
debug!("ProtocolSync::handle_receive_block(): Updating canon state machine");
debug!("handle_receive_block(): Updating canon state machine");
if let Err(e) =
self.state.write().await.update_canon_state(state_updates, None).await
{
error!("handle_receive_block(): Canon statemachine update fail: {}", e);
*self.pending.lock().await = false;
continue
};
debug!("ProtocolSync::handle_receive_block(): Appending block to ledger");
debug!("handle_receive_block(): Appending block to ledger");
if let Err(e) = self.state.write().await.blockchain.add(&[info_copy.clone()]) {
error!("handle_receive_block(): blockchain.add() fail: {}", e);
*self.pending.lock().await = false;
continue
};
if let Err(e) = self.state.write().await.remove_txs(info_copy.txs.clone()) {
error!("handle_receive_block(): remove_txs() fail: {}", e);
*self.pending.lock().await = false;
continue
};
if let Err(e) = self.p2p.broadcast(info_copy).await {
error!("handle_receive_block(): p2p broadcast fail: {}", e);
*self.pending.lock().await = false;
continue
};
}
*self.pending.lock().await = false;
}
}
}

View File

@@ -67,33 +67,24 @@ impl ProtocolVote {
};
if voted {
match self.consensus_p2p.broadcast(vote_copy).await {
Ok(()) => {}
Err(e) => {
error!("handle_receive_vote(): consensus p2p broadcast fail: {}", e);
continue
}
if let Err(e) = self.consensus_p2p.broadcast(vote_copy).await {
error!("handle_receive_vote(): consensus p2p broadcast fail: {}", e);
continue
};
// Broadcast finalized blocks info, if any
match to_broadcast {
Some(blocks) => {
debug!("handle_receive_vote(): Broadcasting finalized blocks");
for info in blocks {
match self.sync_p2p.broadcast(info).await {
Ok(()) => {}
Err(e) => {
error!("handle_receive_vote(): sync p2p broadcast fail: {}", e);
continue
}
};
if let Some(blocks) = to_broadcast {
debug!("handle_receive_vote(): Broadcasting finalized blocks");
for info in blocks {
if let Err(e) = self.sync_p2p.broadcast(info).await {
error!("handle_receive_vote(): sync p2p broadcast fail: {}", e);
// TODO: Should we quit broadcasting if one fails?
continue
}
}
None => {
debug!("handle_receive_vote(): No finalized blocks to broadcast");
continue
}
}
} else {
debug!("handle_receive_vote(): No finalized blocks to broadcast");
};
}
}
}

View File

@@ -24,7 +24,7 @@ use crate::{
},
net,
node::{
state::{state_transition, StateUpdate},
state::{state_transition, ProgramState, StateUpdate},
Client, MemoryState, State,
},
tx::Transaction,
@@ -33,7 +33,7 @@ use crate::{
};
/// `2 * DELTA` represents epoch time
pub const DELTA: u64 = 30;
pub const DELTA: u64 = 20;
/// This struct represents the information required by the consensus algorithm
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
@@ -151,6 +151,10 @@ impl ValidatorState {
burn_vk: Lazy::new(),
}));
// Create zk proof verification keys
let _ = state_machine.lock().await.mint_vk();
let _ = state_machine.lock().await.burn_vk();
let state = Arc::new(RwLock::new(ValidatorState {
address,
secret,
@@ -506,18 +510,18 @@ impl ValidatorState {
let mut encoded_proposal = vec![];
if let Err(e) = vote.proposal.encode(&mut encoded_proposal) {
error!(target: "consensus", "Proposal encoding failed: {:?}", e);
error!("consensus: Proposal encoding failed: {:?}", e);
return Ok((false, None))
};
let va = vote.address.to_string();
if !vote.public_key.verify(&encoded_proposal, &vote.vote) {
warn!(target: "consensus", "Voter ({}), signature couldn't be verified", vote.address.to_string());
warn!("consensus: Voter ({}), signature couldn't be verified", va);
return Ok((false, None))
}
// Node refreshes participants records
self.refresh_participants()?;
let node_count = self.consensus.participants.len();
// Checking that the voter can actually vote.
@@ -525,7 +529,7 @@ impl ValidatorState {
Some(participant) => {
let mut participant = participant.clone();
if current_epoch <= participant.joined {
warn!(target: "consensus", "Voter ({}) joined after current epoch.", vote.address.to_string());
warn!("consensus: Voter ({}) joined after current epoch.", va);
return Ok((false, None))
}
@@ -542,7 +546,7 @@ impl ValidatorState {
self.consensus.participants.insert(participant.address, participant);
}
None => {
warn!(target: "consensus", "Voter ({}) is not a participant!", vote.address.to_string());
warn!("consensus: Voter ({}) is not a participant!", vote.address.to_string());
return Ok((false, None))
}
}
@@ -550,7 +554,7 @@ impl ValidatorState {
let proposal = match self.find_proposal(&vote.proposal) {
Ok(v) => v,
Err(e) => {
error!(target: "consensus", "find_proposal() failed: {}", e);
error!("consensus: find_proposal() failed: {}", e);
return Err(e)
}
};
@@ -581,7 +585,7 @@ impl ValidatorState {
to_broadcast = v;
}
Err(e) => {
error!(target: "consensus", "Block finalization failed: {}", e);
error!("consensus: Block finalization failed: {}", e);
return Err(e)
}
}

View File

@@ -152,7 +152,7 @@ pub fn create_burn_proof(
let start = Instant::now();
let public_inputs = revealed.make_outputs();
let proof = Proof::create(pk, &[c], &public_inputs, &mut OsRng)?;
debug!("Prove: [{:?}]", start.elapsed());
debug!("Prove burn: [{:?}]", start.elapsed());
Ok((proof, revealed))
}
@@ -162,6 +162,9 @@ pub fn verify_burn_proof(
proof: &Proof,
revealed: &BurnRevealedValues,
) -> Result<()> {
let start = Instant::now();
let public_inputs = revealed.make_outputs();
Ok(proof.verify(vk, &public_inputs)?)
proof.verify(vk, &public_inputs)?;
debug!("Verify burn: [{:?}]", start.elapsed());
Ok(())
}

View File

@@ -103,7 +103,7 @@ pub fn create_mint_proof(
let start = Instant::now();
let public_inputs = revealed.make_outputs();
let proof = Proof::create(pk, &[c], &public_inputs, &mut OsRng)?;
debug!("Prove: [{:?}]", start.elapsed());
debug!("Prove mint: [{:?}]", start.elapsed());
Ok((proof, revealed))
}
@@ -113,6 +113,9 @@ pub fn verify_mint_proof(
proof: &Proof,
revealed: &MintRevealedValues,
) -> Result<()> {
let start = Instant::now();
let public_inputs = revealed.make_outputs();
Ok(proof.verify(vk, &public_inputs)?)
proof.verify(vk, &public_inputs)?;
debug!("Verify mint: [{:?}]", start.elapsed());
Ok(())
}