src/consensus: split state, fully sled integration, modified nodes-tool to extract all sled db data

This commit is contained in:
aggstam
2022-04-14 13:19:00 +03:00
parent 3658317512
commit 7a34ec3d45
16 changed files with 691 additions and 270 deletions

View File

@@ -1,3 +1,3 @@
/target
Cargo.lock
validatord_state_*
validatord_*

View File

@@ -9,6 +9,7 @@ features = ["blockchain"]
[dependencies]
blake3 = "1.3.1"
sled = "0.34.7"
[workspace]

View File

@@ -1,19 +1,328 @@
use std::{fs::File, io::Write};
use darkfi::{consensus::state::State, util::expand_path, Result};
use darkfi::{
consensus::{
block::{Block, BlockProposal, BlockStore},
blockchain::{Blockchain, ProposalsChain},
metadata::{Metadata, MetadataStore, OuroborosMetadata, StreamletMetadata},
participant::Participant,
state::{ConsensusState, ValidatorState},
tx::{Tx, TxStore},
util::Timestamp,
vote::Vote,
},
util::expand_path,
Result,
};
#[derive(Debug)]
struct ParticipantInfo {
_id: u64,
_joined: u64,
_voted: Option<u64>,
}
impl ParticipantInfo {
pub fn new(participant: &Participant) -> ParticipantInfo {
let _id = participant.id;
let _joined = participant.joined;
let _voted = participant.voted;
ParticipantInfo { _id, _joined, _voted }
}
}
#[derive(Debug)]
struct VoteInfo {
_proposal: blake3::Hash,
_sl: u64,
_id: u64,
}
impl VoteInfo {
pub fn new(vote: &Vote) -> VoteInfo {
let _proposal = vote.proposal;
let _sl = vote.sl;
let _id = vote.id;
VoteInfo { _proposal, _sl, _id }
}
}
#[derive(Debug)]
struct StreamletMetadataInfo {
_votes: Vec<VoteInfo>,
_notarized: bool,
_finalized: bool,
_participants: Vec<ParticipantInfo>,
}
impl StreamletMetadataInfo {
pub fn new(metadata: &StreamletMetadata) -> StreamletMetadataInfo {
let mut _votes = Vec::new();
for vote in &metadata.votes {
_votes.push(VoteInfo::new(&vote));
}
let _notarized = metadata.notarized;
let _finalized = metadata.finalized;
let mut _participants = Vec::new();
for participant in &metadata.participants {
_participants.push(ParticipantInfo::new(&participant));
}
StreamletMetadataInfo { _votes, _notarized, _finalized, _participants }
}
}
#[derive(Debug)]
struct OuroborosMetadataInfo {
_proof: String,
_r: String,
_s: String,
}
impl OuroborosMetadataInfo {
pub fn new(metadata: &OuroborosMetadata) -> OuroborosMetadataInfo {
let _proof = metadata.proof.clone();
let _r = metadata.r.clone();
let _s = metadata.s.clone();
OuroborosMetadataInfo { _proof, _r, _s }
}
}
#[derive(Debug)]
struct MetadataInfo {
_timestamp: Timestamp,
_om: OuroborosMetadataInfo,
_sm: StreamletMetadataInfo,
}
impl MetadataInfo {
pub fn new(metadata: &Metadata) -> MetadataInfo {
let _timestamp = metadata.timestamp.clone();
let _om = OuroborosMetadataInfo::new(&metadata.om);
let _sm = StreamletMetadataInfo::new(&metadata.sm);
MetadataInfo { _timestamp, _om, _sm }
}
}
#[derive(Debug)]
struct ProposalInfo {
_id: u64,
_st: blake3::Hash,
_sl: u64,
_txs: Vec<Tx>,
_metadata: MetadataInfo,
}
impl ProposalInfo {
pub fn new(proposal: &BlockProposal) -> ProposalInfo {
let _id = proposal.id;
let _st = proposal.st;
let _sl = proposal.sl;
let _txs = proposal.txs.clone();
let _metadata = MetadataInfo::new(&proposal.metadata);
ProposalInfo { _id, _st, _sl, _txs, _metadata }
}
}
#[derive(Debug)]
struct ProposalsInfoChain {
_proposals: Vec<ProposalInfo>,
}
impl ProposalsInfoChain {
pub fn new(proposals: &ProposalsChain) -> ProposalsInfoChain {
let mut _proposals = Vec::new();
for proposal in &proposals.proposals {
_proposals.push(ProposalInfo::new(&proposal));
}
ProposalsInfoChain { _proposals }
}
}
#[derive(Debug)]
struct ConsensusInfo {
_genesis: Timestamp,
_last_block: blake3::Hash,
_last_sl: u64,
_proposals: Vec<ProposalsInfoChain>,
}
impl ConsensusInfo {
pub fn new(consensus: &ConsensusState) -> ConsensusInfo {
let _genesis = consensus.genesis.clone();
let _last_block = consensus.last_block.clone();
let _last_sl = consensus.last_sl.clone();
let mut _proposals = Vec::new();
for proposal in &consensus.proposals {
_proposals.push(ProposalsInfoChain::new(&proposal));
}
ConsensusInfo { _genesis, _last_block, _last_sl, _proposals }
}
}
#[derive(Debug)]
struct BlockInfo {
_hash: blake3::Hash,
_st: blake3::Hash,
_sl: u64,
_txs: Vec<blake3::Hash>,
}
impl BlockInfo {
pub fn new(_hash: blake3::Hash, block: &Block) -> BlockInfo {
let _st = block.st;
let _sl = block.sl;
let _txs = block.txs.clone();
BlockInfo { _hash, _st, _sl, _txs }
}
}
#[derive(Debug)]
struct BlockInfoChain {
_blocks: Vec<BlockInfo>,
}
impl BlockInfoChain {
pub fn new(blockstore: &BlockStore) -> BlockInfoChain {
let mut _blocks = Vec::new();
let result = blockstore.get_all();
match result {
Ok(iter) => {
for item in iter.iter() {
match item {
Some((hash, block)) => _blocks.push(BlockInfo::new(hash.clone(), &block)),
None => (),
};
}
}
Err(e) => println!("Error: {:?}", e),
}
BlockInfoChain { _blocks }
}
}
#[derive(Debug)]
struct TxInfo {
_hash: blake3::Hash,
_payload: String,
}
impl TxInfo {
pub fn new(_hash: blake3::Hash, tx: &Tx) -> TxInfo {
let _payload = tx.payload.clone();
TxInfo { _hash, _payload }
}
}
#[derive(Debug)]
struct TxStoreInfo {
_transactions: Vec<TxInfo>,
}
impl TxStoreInfo {
pub fn new(txstore: &TxStore) -> TxStoreInfo {
let mut _transactions = Vec::new();
let result = txstore.get_all();
match result {
Ok(iter) => {
for item in iter.iter() {
match item {
Some((hash, tx)) => _transactions.push(TxInfo::new(hash.clone(), &tx)),
None => (),
};
}
}
Err(e) => println!("Error: {:?}", e),
}
TxStoreInfo { _transactions }
}
}
#[derive(Debug)]
struct HashedMetadataInfo {
_block: blake3::Hash,
_metadata: MetadataInfo,
}
impl HashedMetadataInfo {
pub fn new(_block: blake3::Hash, metadata: &Metadata) -> HashedMetadataInfo {
let _metadata = MetadataInfo::new(&metadata);
HashedMetadataInfo { _block, _metadata }
}
}
#[derive(Debug)]
struct MetadataStoreInfo {
_metadata: Vec<HashedMetadataInfo>,
}
impl MetadataStoreInfo {
pub fn new(metadatastore: &MetadataStore) -> MetadataStoreInfo {
let mut _metadata = Vec::new();
let result = metadatastore.get_all();
match result {
Ok(iter) => {
for item in iter.iter() {
match item {
Some((hash, m)) => {
_metadata.push(HashedMetadataInfo::new(hash.clone(), &m))
}
None => (),
};
}
}
Err(e) => println!("Error: {:?}", e),
}
MetadataStoreInfo { _metadata }
}
}
#[derive(Debug)]
struct BlockchainInfo {
_blocks: BlockInfoChain,
_transactions: TxStoreInfo,
_metadata: MetadataStoreInfo,
}
impl BlockchainInfo {
pub fn new(blockchain: &Blockchain) -> BlockchainInfo {
let _blocks = BlockInfoChain::new(&blockchain.blocks);
let _transactions = TxStoreInfo::new(&blockchain.transactions);
let _metadata = MetadataStoreInfo::new(&blockchain.metadata);
BlockchainInfo { _blocks, _transactions, _metadata }
}
}
#[derive(Debug)]
struct StateInfo {
_id: u64,
_consensus: ConsensusInfo,
_blockchain: BlockchainInfo,
_unconfirmed_txs: Vec<Tx>,
}
impl StateInfo {
pub fn new(state: &ValidatorState) -> StateInfo {
let _id = state.id;
let _consensus = ConsensusInfo::new(&state.consensus);
let _blockchain = BlockchainInfo::new(&state.blockchain);
let _unconfirmed_txs = state.unconfirmed_txs.clone();
StateInfo { _id, _consensus, _blockchain, _unconfirmed_txs }
}
}
fn main() -> Result<()> {
let nodes = 4;
let genesis = 1648383795;
for i in 0..4 {
for i in 0..nodes {
let path = format!("~/.config/darkfi/validatord_db_{:?}", i);
let database_path = expand_path(&path).unwrap();
println!("Export state from sled database: {:?}", database_path);
let database = sled::open(database_path).unwrap();
let state = State::load_current_state(genesis, i, &database).unwrap();
let state_string = format!("{:#?}", state.read().unwrap());
println!("Export data from sled database: {:?}", database_path);
let state = ValidatorState::new(database_path, i, genesis).unwrap();
let info = StateInfo::new(&*state.read().unwrap());
let info_string = format!("{:#?}", info);
let path = format!("validatord_state_{:?}", i);
let mut file = File::create(path)?;
file.write(state_string.as_bytes())?;
file.write(info_string.as_bytes())?;
}
Ok(())

View File

@@ -13,7 +13,7 @@ use structopt_toml::StructOptToml;
use darkfi::{
consensus::{
participant::Participant,
state::{State, StatePtr},
state::{ValidatorState, ValidatorStatePtr},
tx::Tx,
},
net,
@@ -95,7 +95,7 @@ struct Opt {
verbose: u8,
}
async fn proposal_task(p2p: net::P2pPtr, state: StatePtr, database: &sled::Db) {
async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
// Node signals the network that it starts participating
let participant =
Participant::new(state.read().unwrap().id, state.read().unwrap().current_epoch());
@@ -136,7 +136,11 @@ async fn proposal_task(p2p: net::P2pPtr, state: StatePtr, database: &sled::Db) {
debug!("Node did not vote for the proposed block.");
} else {
let vote = x.unwrap();
state.write().unwrap().receive_vote(&vote);
let result = state.write().unwrap().receive_vote(&vote);
match result {
Ok(_) => info!("Vote saved successfuly."),
Err(e) => error!("Vote save failed. Error: {:?}", e),
}
// Broadcasting block
let result = p2p.broadcast(unwrapped).await;
match result {
@@ -161,7 +165,7 @@ async fn proposal_task(p2p: net::P2pPtr, state: StatePtr, database: &sled::Db) {
}
// Current node state is flushed to sled database
let result = state.read().unwrap().save(database);
let result = state.read().unwrap().save_consensus_state();
match result {
Ok(()) => (),
Err(e) => {
@@ -196,9 +200,8 @@ async fn start(executor: Arc<Executor<'_>>, opts: &Opt) -> Result<()> {
// State setup
let genesis = opts.genesis;
let database_path = expand_path(&opts.database).unwrap();
let database = sled::open(database_path).unwrap();
let id = opts.id.clone();
let state = State::load_current_state(genesis, id, &database).unwrap();
let state = ValidatorState::new(database_path, id, genesis).unwrap();
// P2P registry setup
let p2p = net::P2p::new(network_settings).await;
@@ -265,13 +268,13 @@ async fn start(executor: Arc<Executor<'_>>, opts: &Opt) -> Result<()> {
.spawn(async move { listen_and_serve(rpc_server_config, rpc_interface, ex3).await })
.detach();
proposal_task(p2p, state, &database).await;
proposal_task(p2p, state).await;
Ok(())
}
struct JsonRpcInterface {
state: StatePtr,
state: ValidatorStatePtr,
p2p: net::P2pPtr,
_rpc_listen_addr: SocketAddr,
}

View File

@@ -2,7 +2,7 @@ use async_executor::Executor;
use async_trait::async_trait;
use darkfi::{
consensus::{participant::Participant, state::StatePtr},
consensus::{participant::Participant, state::ValidatorStatePtr},
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
@@ -15,12 +15,16 @@ use std::sync::Arc;
pub struct ProtocolParticipant {
participant_sub: MessageSubscription<Participant>,
jobsman: ProtocolJobsManagerPtr,
state: StatePtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
}
impl ProtocolParticipant {
pub async fn init(channel: ChannelPtr, state: StatePtr, p2p: P2pPtr) -> ProtocolBasePtr {
pub async fn init(
channel: ChannelPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
) -> ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<Participant>().await;
@@ -46,7 +50,8 @@ impl ProtocolParticipant {
participant
);
if self.state.write().unwrap().append_participant((*participant).clone()) {
let pending_participants = self.state.read().unwrap().pending_participants.clone();
let pending_participants =
self.state.read().unwrap().consensus.pending_participants.clone();
for pending_participant in pending_participants {
self.p2p.broadcast(pending_participant.clone()).await?;
}

View File

@@ -2,7 +2,7 @@ use async_executor::Executor;
use async_trait::async_trait;
use darkfi::{
consensus::{block::BlockProposal, state::StatePtr},
consensus::{block::BlockProposal, state::ValidatorStatePtr},
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
@@ -15,12 +15,16 @@ use std::sync::Arc;
pub struct ProtocolProposal {
proposal_sub: MessageSubscription<BlockProposal>,
jobsman: ProtocolJobsManagerPtr,
state: StatePtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
}
impl ProtocolProposal {
pub async fn init(channel: ChannelPtr, state: StatePtr, p2p: P2pPtr) -> ProtocolBasePtr {
pub async fn init(
channel: ChannelPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
) -> ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<BlockProposal>().await;
@@ -53,7 +57,7 @@ impl ProtocolProposal {
debug!("Node did not vote for the proposed block.");
} else {
let vote = x.unwrap();
self.state.write().unwrap().receive_vote(&vote);
self.state.write().unwrap().receive_vote(&vote)?;
// Broadcasting block to rest nodes
self.p2p.broadcast(proposal_copy).await?;
// Broadcasting vote

View File

@@ -2,7 +2,7 @@ use async_executor::Executor;
use async_trait::async_trait;
use darkfi::{
consensus::{state::StatePtr, tx::Tx},
consensus::{state::ValidatorStatePtr, tx::Tx},
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
@@ -15,12 +15,16 @@ use std::sync::Arc;
pub struct ProtocolTx {
tx_sub: MessageSubscription<Tx>,
jobsman: ProtocolJobsManagerPtr,
state: StatePtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
}
impl ProtocolTx {
pub async fn init(channel: ChannelPtr, state: StatePtr, p2p: P2pPtr) -> ProtocolBasePtr {
pub async fn init(
channel: ChannelPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
) -> ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<Tx>().await;

View File

@@ -2,7 +2,7 @@ use async_executor::Executor;
use async_trait::async_trait;
use darkfi::{
consensus::{state::StatePtr, vote::Vote},
consensus::{state::ValidatorStatePtr, vote::Vote},
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
@@ -15,12 +15,16 @@ use std::sync::Arc;
pub struct ProtocolVote {
vote_sub: MessageSubscription<Vote>,
jobsman: ProtocolJobsManagerPtr,
state: StatePtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
}
impl ProtocolVote {
pub async fn init(channel: ChannelPtr, state: StatePtr, p2p: P2pPtr) -> ProtocolBasePtr {
pub async fn init(
channel: ChannelPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
) -> ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<Vote>().await;
@@ -45,7 +49,7 @@ impl ProtocolVote {
vote
);
let vote_copy = (*vote).clone();
if self.state.write().unwrap().receive_vote(&vote_copy) {
if self.state.write().unwrap().receive_vote(&vote_copy)? {
self.p2p.broadcast(vote_copy).await?;
};
}