consensus: use default wallet as node id

This commit is contained in:
aggstam
2022-04-25 21:25:53 +03:00
parent 145a7c9a51
commit a64b149ea2
9 changed files with 75 additions and 71 deletions

View File

@@ -7,7 +7,6 @@ use easy_parallel::Parallel;
use futures_lite::future;
use lazy_init::Lazy;
use log::{error, info};
use rand::Rng;
use serde_derive::Deserialize;
use simplelog::{ColorChoice, TermLogger, TerminalMode};
use structopt::StructOpt;
@@ -257,12 +256,13 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
return Err(Error::UnsupportedChain)
}
};
// TODO: Is this ok?
let mut rng = rand::thread_rng();
let id: u64 = rng.gen();
// TODO: sqldb init cleanup
Client::new(wallet.clone()).await?;
let address = wallet.get_default_address().await?;
// Initialize validator state
let state = ValidatorState::new(&sled_db, id, genesis_ts, genesis_data)?;
let state = ValidatorState::new(&sled_db, address, genesis_ts, genesis_data)?;
let sync_p2p = {
info!("Registering block sync P2P protocols...");

View File

@@ -4,7 +4,7 @@ use log::debug;
use super::{Metadata, StreamletMetadata, Timestamp, Tx};
use crate::{
crypto::{keypair::PublicKey, schnorr::Signature},
crypto::{address::Address, keypair::PublicKey, schnorr::Signature},
impl_vec, net,
util::serial::{serialize, Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt},
Result,
@@ -109,8 +109,8 @@ pub struct BlockProposal {
pub public_key: PublicKey,
/// Block signature
pub signature: Signature,
/// Leader ID
pub id: u64,
/// Leader address
pub address: Address,
/// Block data
pub block: BlockInfo,
}
@@ -120,7 +120,7 @@ impl BlockProposal {
pub fn new(
public_key: PublicKey,
signature: Signature,
id: u64,
address: Address,
st: blake3::Hash,
sl: u64,
txs: Vec<Tx>,
@@ -128,7 +128,7 @@ impl BlockProposal {
sm: StreamletMetadata,
) -> Self {
let block = BlockInfo::new(st, sl, txs, metadata, sm);
Self { public_key, signature, id, block }
Self { public_key, signature, address, block }
}
/// Produce proposal hash using `st`, `sl`, `txs`, and `metadata`.
@@ -156,7 +156,7 @@ impl PartialEq for BlockProposal {
fn eq(&self, other: &Self) -> bool {
self.public_key == other.public_key &&
self.signature == other.signature &&
self.id == other.id &&
self.address == other.address &&
self.block.st == other.block.st &&
self.block.sl == other.block.sl &&
self.block.txs == other.block.txs &&

View File

@@ -1,17 +1,18 @@
use std::{collections::BTreeMap, io};
use crate::{
crypto::address::Address,
impl_vec, net,
util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt},
Result,
};
/// This struct represents a tuple of the form:
/// (`node_id`, `epoch_joined`, `last_epoch_voted`)
/// (`node_address`, `epoch_joined`, `last_epoch_voted`)
#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)]
pub struct Participant {
// Node ID
pub id: u64,
/// Node wallet address
pub address: Address,
/// Epoch node joined the network
pub joined: u64,
/// Last epoch node voted
@@ -19,8 +20,8 @@ pub struct Participant {
}
impl Participant {
pub fn new(id: u64, joined: u64) -> Self {
Self { id, joined, voted: None }
pub fn new(address: Address, joined: u64) -> Self {
Self { address, joined, voted: None }
}
}
@@ -30,7 +31,7 @@ impl net::Message for Participant {
}
}
impl Encodable for BTreeMap<u64, Participant> {
impl Encodable for BTreeMap<Address, Participant> {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
let mut len = 0;
len += VarInt(self.len() as u64).encode(&mut s)?;
@@ -41,13 +42,13 @@ impl Encodable for BTreeMap<u64, Participant> {
}
}
impl Decodable for BTreeMap<u64, Participant> {
impl Decodable for BTreeMap<Address, Participant> {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
let len = VarInt::decode(&mut d)?.0;
let mut ret = BTreeMap::new();
for _ in 0..len {
let participant: Participant = Decodable::decode(&mut d)?;
ret.insert(participant.id, participant);
ret.insert(participant.address, participant);
}
Ok(ret)
}

View File

@@ -17,6 +17,7 @@ use super::{
use crate::{
blockchain::Blockchain,
crypto::{
address::Address,
keypair::{PublicKey, SecretKey},
schnorr::{SchnorrPublic, SchnorrSecret},
},
@@ -40,10 +41,8 @@ pub struct ConsensusState {
/// Orphan votes pool, in case a vote reaches a node before the
/// corresponding block
pub orphan_votes: Vec<Vote>,
/// Node participation identity
pub participant: Option<Participant>,
/// Validators currently participating in the consensus
pub participants: BTreeMap<u64, Participant>,
pub participants: BTreeMap<Address, Participant>,
/// Validators to be added on the next epoch as participants
pub pending_participants: Vec<Participant>,
/// Last slot participants where refreshed
@@ -60,7 +59,6 @@ impl ConsensusState {
genesis_block,
proposals: vec![],
orphan_votes: vec![],
participant: None,
participants: BTreeMap::new(),
pending_participants: vec![],
refreshed: 0,
@@ -71,8 +69,8 @@ impl ConsensusState {
/// Auxiliary structure used for consensus syncing.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct ConsensusRequest {
/// Validator ID
pub id: u64,
/// Validator wallet address
pub address: Address,
}
impl net::Message for ConsensusRequest {
@@ -99,11 +97,11 @@ pub type ValidatorStatePtr = Arc<RwLock<ValidatorState>>;
/// This struct represents the state of a validator node.
pub struct ValidatorState {
/// Validator ID
pub id: u64,
/// Node wallet address
pub address: Address,
/// Secret key, to sign messages
pub secret: SecretKey,
/// Validator public key
/// Node public key
pub public: PublicKey,
/// Hot/Live data used by the consensus algorithm
pub consensus: ConsensusState,
@@ -117,10 +115,9 @@ pub struct ValidatorState {
impl ValidatorState {
// TODO: Clock sync
// TODO: ID shouldn't be done like this
pub fn new(
db: &sled::Db, // <-- TODO: Avoid this with some wrapping, sled should only be in blockchain
id: u64,
address: Address,
genesis_ts: Timestamp,
genesis_data: blake3::Hash,
) -> Result<ValidatorStatePtr> {
@@ -132,7 +129,7 @@ impl ValidatorState {
let participating = false;
let state = Arc::new(RwLock::new(ValidatorState {
id,
address,
secret,
public,
consensus,
@@ -201,7 +198,7 @@ impl ValidatorState {
/// Find epoch leader, using a simple hash method.
/// Leader calculation is based on how many nodes are participating
/// in the network.
pub fn epoch_leader(&mut self) -> u64 {
pub fn epoch_leader(&mut self) -> Address {
let epoch = self.current_epoch();
// DefaultHasher is used to hash the epoch number
// because it produces a number string which then can be modulated by the len.
@@ -211,12 +208,13 @@ impl ValidatorState {
let pos = hasher.finish() % (self.consensus.participants.len() as u64);
// Since BTreeMap orders by key in asceding order, each node will have
// the same key in calculated position.
self.consensus.participants.iter().nth(pos as usize).unwrap().1.id
self.consensus.participants.iter().nth(pos as usize).unwrap().1.address
}
/// Check if we're the current epoch leader
pub fn is_epoch_leader(&mut self) -> bool {
self.id == self.epoch_leader()
let address = self.address;
address == self.epoch_leader()
}
/// Generate a block proposal for the current epoch, containing all
@@ -241,7 +239,7 @@ impl ValidatorState {
Ok(Some(BlockProposal::new(
self.public,
signed_proposal,
self.id,
self.address,
prev_hash,
epoch,
unproposed_txs,
@@ -312,10 +310,11 @@ impl ValidatorState {
self.refresh_participants()?;
let leader = self.epoch_leader();
if leader != proposal.id {
if leader != proposal.address {
warn!(
"Received proposal not from epoch leader ({}), but from ({})",
leader, proposal.id
leader,
proposal.address.to_string()
);
return Ok(None)
}
@@ -330,7 +329,7 @@ impl ValidatorState {
.as_bytes(),
&proposal.signature,
) {
warn!("Proposer ({}) signature could not be verified", proposal.id);
warn!("Proposer ({}) signature could not be verified", proposal.address.to_string());
return Ok(None)
}
@@ -384,7 +383,13 @@ impl ValidatorState {
}
let signed_hash = self.secret.sign(&serialize(&proposal_hash));
Ok(Some(Vote::new(self.public, signed_hash, proposal_hash, proposal.block.sl, self.id)))
Ok(Some(Vote::new(
self.public,
signed_hash,
proposal_hash,
proposal.block.sl,
self.address,
)))
}
/// Verify if the provided chain is notarized excluding the last block.
@@ -449,7 +454,7 @@ impl ValidatorState {
};
if !vote.public_key.verify(&encoded_proposal, &vote.vote) {
warn!(target: "consensus", "Voter ({}), signature couldn't be verified", vote.id);
warn!(target: "consensus", "Voter ({}), signature couldn't be verified", vote.address.to_string());
return Ok((false, None))
}
@@ -459,15 +464,15 @@ impl ValidatorState {
let node_count = self.consensus.participants.len();
// Checking that the voter can actually vote.
match self.consensus.participants.get(&vote.id) {
match self.consensus.participants.get(&vote.address) {
Some(participant) => {
if self.current_epoch() <= participant.joined {
warn!(target: "consensus", "Voter ({}) joined after current epoch.", vote.id);
warn!(target: "consensus", "Voter ({}) joined after current epoch.", vote.address.to_string());
return Ok((false, None))
}
}
None => {
warn!(target: "consensus", "Voter ({}) is not a participant!", vote.id);
warn!(target: "consensus", "Voter ({}) is not a participant!", vote.address.to_string());
return Ok((false, None))
}
}
@@ -513,9 +518,9 @@ impl ValidatorState {
}
// Updating participant vote
let mut participant = match self.consensus.participants.get(&vote.id) {
let mut participant = match self.consensus.participants.get(&vote.address) {
Some(p) => p.clone(),
None => Participant::new(vote.id, vote.sl),
None => Participant::new(vote.address, vote.sl),
};
match participant.voted {
@@ -527,7 +532,7 @@ impl ValidatorState {
None => participant.voted = Some(vote.sl),
}
self.consensus.participants.insert(participant.id, participant);
self.consensus.participants.insert(participant.address, participant);
Ok((true, Some(to_broadcast)))
}
@@ -646,12 +651,6 @@ impl ValidatorState {
Ok(finalized)
}
/// Append node participant identity to the pending participants list.
pub fn append_self_participant(&mut self, participant: Participant) {
self.consensus.participant = Some(participant.clone());
self.append_participant(participant);
}
/// Append a new participant to the pending participants list.
pub fn append_participant(&mut self, participant: Participant) -> bool {
if self.consensus.pending_participants.contains(&participant) {
@@ -677,7 +676,7 @@ impl ValidatorState {
debug!("refresh_participants(): Adding pending participants");
for participant in &self.consensus.pending_participants {
self.consensus.participants.insert(participant.id, participant.clone());
self.consensus.participants.insert(participant.address, participant.clone());
}
if self.consensus.participants.is_empty() {
@@ -711,7 +710,10 @@ impl ValidatorState {
match participant.voted {
Some(epoch) => {
if epoch < last_epoch {
warn!("refresh_participants(): Inactive participant: {:?}", participant);
warn!(
"refresh_participants(): Inactive participant: {:?}",
participant.address.to_string()
);
inactive.push(*index);
}
}
@@ -719,7 +721,10 @@ impl ValidatorState {
if participant.joined < previous_epoch &&
participant.joined < previous_from_last_epoch
{
warn!("refresh_participants(): Inactive participant: {:?}", participant);
warn!(
"refresh_participants(): Inactive participant: {:?}",
participant.address.to_string()
);
inactive.push(*index);
}
}
@@ -732,10 +737,8 @@ impl ValidatorState {
if self.consensus.participants.is_empty() {
// If no nodes are active, node becomes a single node network.
let mut participant = self.consensus.participant.clone().unwrap();
participant.joined = epoch;
self.consensus.participant = Some(participant.clone());
self.consensus.participants.insert(participant.id, participant.clone());
let participant = Participant::new(self.address, self.current_epoch());
self.consensus.participants.insert(participant.address, participant.clone());
}
self.consensus.refreshed = epoch;
@@ -753,7 +756,6 @@ impl ValidatorState {
genesis_block,
proposals: vec![],
orphan_votes: vec![],
participant: None,
participants: BTreeMap::new(),
pending_participants: vec![],
refreshed: 0,

View File

@@ -25,7 +25,7 @@ pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Resul
let response_sub = channel.subscribe_msg::<ConsensusResponse>().await?;
// Node creates a `ConsensusRequest` and sends it
let request = ConsensusRequest { id: state.read().await.id };
let request = ConsensusRequest { address: state.read().await.address };
channel.send(request).await?;
// Node stores response data. Extra validations can be added here.

View File

@@ -37,8 +37,9 @@ pub async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
}
// Node signals the network that it will start participating
let participant = Participant::new(state.read().await.id, state.read().await.current_epoch());
state.write().await.append_self_participant(participant.clone());
let participant =
Participant::new(state.read().await.address, state.read().await.current_epoch());
state.write().await.append_participant(participant.clone());
match p2p.broadcast(participant).await {
Ok(()) => info!("Consensus participation message broadcasted successfully."),

View File

@@ -1,7 +1,7 @@
use std::io;
use crate::{
crypto::{keypair::PublicKey, schnorr::Signature},
crypto::{address::Address, keypair::PublicKey, schnorr::Signature},
impl_vec, net,
util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt},
Result,
@@ -18,8 +18,8 @@ pub struct Vote {
pub proposal: blake3::Hash,
/// Slot uid, generated by the beacon
pub sl: u64,
/// Node ID
pub id: u64,
/// Node wallet address
pub address: Address,
}
impl Vote {
@@ -28,9 +28,9 @@ impl Vote {
vote: Signature,
proposal: blake3::Hash,
sl: u64,
id: u64,
address: Address,
) -> Self {
Self { public_key, vote, proposal, sl, id }
Self { public_key, vote, proposal, sl, address }
}
}

View File

@@ -12,7 +12,7 @@ enum AddressType {
Payment = 0,
}
#[derive(Copy, Clone, PartialEq, Debug)]
#[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
pub struct Address(pub [u8; 37]);
impl Address {

View File

@@ -162,11 +162,11 @@ impl WalletDb {
Ok(Keypair { secret, public })
}
pub async fn get_default_address(&self) -> Result<Address> {
debug!("Returning default address");
let keypair = self.get_default_keypair().await?;
Ok(Address::from(keypair.public))
}