consensus: Don't use wrapping struct for transactions.

This commit is contained in:
parazyd
2022-05-01 10:12:53 +02:00
parent 0d708d935b
commit 0fd0cee7fb
10 changed files with 55 additions and 56 deletions

View File

@@ -142,7 +142,7 @@ struct Args {
pub struct Darkfid {
synced: Mutex<bool>, // AtomicBool is weird in Arc
consensus_p2p: Option<P2pPtr>,
_consensus_p2p: Option<P2pPtr>,
sync_p2p: Option<P2pPtr>,
client: Arc<Client>,
validator_state: ValidatorStatePtr,
@@ -190,7 +190,13 @@ impl Darkfid {
let client = validator_state.read().await.client.clone();
debug!("Released validator state lock");
Ok(Self { synced: Mutex::new(false), consensus_p2p, sync_p2p, client, validator_state })
Ok(Self {
synced: Mutex::new(false),
_consensus_p2p: consensus_p2p,
sync_p2p,
client,
validator_state,
})
}
}

View File

@@ -4,7 +4,6 @@ use log::{error, warn};
use serde_json::{json, Value};
use darkfi::{
consensus::Tx,
crypto::{address::Address, keypair::PublicKey, token_id::generate_id},
rpc::{
jsonrpc,
@@ -117,7 +116,7 @@ impl Darkfid {
};
if let Some(sync_p2p) = &self.sync_p2p {
match sync_p2p.broadcast(Tx(tx.clone())).await {
match sync_p2p.broadcast(tx.clone()).await {
Ok(()) => {}
Err(e) => {
error!("transfer(): Failed broadcasting transaction: {}", e);

View File

@@ -48,7 +48,7 @@ enum DrkSubcommand {
#[clap(long)]
/// JSON-RPC endpoint of the faucet
endpoint: Url,
faucet_endpoint: Url,
/// f64 amount requested for airdrop
amount: f64,
@@ -269,8 +269,8 @@ async fn main() -> Result<()> {
match args.command {
DrkSubcommand::Ping => drk.ping().await,
DrkSubcommand::Airdrop { address, endpoint, amount } => {
drk.airdrop(address, endpoint, amount).await
DrkSubcommand::Airdrop { address, faucet_endpoint, amount } => {
drk.airdrop(address, faucet_endpoint, amount).await
}
DrkSubcommand::Wallet { keygen, balance, address, all_addresses } => {

View File

@@ -20,7 +20,7 @@ use darkfi::{
consensus::{
proto::{ProtocolSync, ProtocolTx},
task::block_sync_task,
Timestamp, Tx, ValidatorState, ValidatorStatePtr, MAINNET_GENESIS_HASH_BYTES,
Timestamp, ValidatorState, ValidatorStatePtr, MAINNET_GENESIS_HASH_BYTES,
TESTNET_GENESIS_HASH_BYTES,
},
crypto::{address::Address, keypair::PublicKey, token_list::DrkTokenList},
@@ -257,7 +257,7 @@ impl Faucetd {
};
// Broadcast transaction to the network.
match self.sync_p2p.broadcast(Tx(tx.clone())).await {
match self.sync_p2p.broadcast(tx.clone()).await {
Ok(()) => {}
Err(e) => {
error!("airdrop(): Failed broadcasting transaction: {}", e);

View File

@@ -1,7 +1,7 @@
use sled::Batch;
use crate::{
consensus::Tx,
tx::Transaction,
util::serial::{deserialize, serialize},
Error, Result,
};
@@ -17,11 +17,11 @@ impl TxStore {
Ok(Self(tree))
}
/// Insert a slice of [`Tx`] into the txstore. With sled, the
/// Insert a slice of [`Transaction`] into the txstore. With sled, the
/// operation is done as a batch.
/// The transactions are hashed with BLAKE3 and this hash is
/// used as the key, while value is the serialized tx itself.
pub fn insert(&self, txs: &[Tx]) -> Result<Vec<blake3::Hash>> {
pub fn insert(&self, txs: &[Transaction]) -> Result<Vec<blake3::Hash>> {
let mut ret = Vec::with_capacity(txs.len());
let mut batch = Batch::default();
for i in txs {
@@ -42,8 +42,12 @@ impl TxStore {
/// Fetch requested transactions from the txstore. The `strict` param
/// will make the function fail if a transaction has not been found.
pub fn get(&self, tx_hashes: &[blake3::Hash], strict: bool) -> Result<Vec<Option<Tx>>> {
let mut ret: Vec<Option<Tx>> = Vec::with_capacity(tx_hashes.len());
pub fn get(
&self,
tx_hashes: &[blake3::Hash],
strict: bool,
) -> Result<Vec<Option<Transaction>>> {
let mut ret: Vec<Option<Transaction>> = Vec::with_capacity(tx_hashes.len());
for i in tx_hashes {
if let Some(found) = self.0.get(i.as_bytes())? {
@@ -63,7 +67,7 @@ impl TxStore {
/// Retrieve all transactions.
/// Be careful as this will try to load everything in memory.
pub fn get_all(&self) -> Result<Vec<Option<(blake3::Hash, Tx)>>> {
pub fn get_all(&self) -> Result<Vec<Option<(blake3::Hash, Transaction)>>> {
let mut txs = vec![];
let iterator = self.0.into_iter().enumerate();
for (_, r) in iterator {

View File

@@ -2,10 +2,11 @@ use std::io;
use log::debug;
use super::{Metadata, StreamletMetadata, Timestamp, Tx};
use super::{Metadata, StreamletMetadata, Timestamp};
use crate::{
crypto::{address::Address, keypair::PublicKey, schnorr::Signature},
impl_vec, net,
tx::Transaction,
util::serial::{serialize, Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt},
Result,
};
@@ -62,7 +63,7 @@ pub struct BlockInfo {
/// Slot UID, generated by the beacon
pub sl: u64,
/// Transactions payload
pub txs: Vec<Tx>,
pub txs: Vec<Transaction>,
/// Additional proposal information
pub metadata: Metadata,
/// Proposal information used by Streamlet consensus
@@ -73,7 +74,7 @@ impl BlockInfo {
pub fn new(
st: blake3::Hash,
sl: u64,
txs: Vec<Tx>,
txs: Vec<Transaction>,
metadata: Metadata,
sm: StreamletMetadata,
) -> Self {
@@ -123,7 +124,7 @@ impl BlockProposal {
address: Address,
st: blake3::Hash,
sl: u64,
txs: Vec<Tx>,
txs: Vec<Transaction>,
metadata: Metadata,
sm: StreamletMetadata,
) -> Self {
@@ -140,7 +141,7 @@ impl BlockProposal {
pub fn to_proposal_hash(
st: blake3::Hash,
sl: u64,
transactions: &[Tx],
transactions: &[Transaction],
metadata: &Metadata,
) -> blake3::Hash {
let mut txs = Vec::with_capacity(transactions.len());

View File

@@ -2,10 +2,6 @@
pub mod block;
pub use block::{Block, BlockInfo, BlockProposal, ProposalChain};
/// Transactions
pub mod tx;
pub use tx::Tx;
/// Consensus metadata
pub mod metadata;
pub use metadata::{Metadata, StreamletMetadata};

View File

@@ -4,23 +4,31 @@ use async_trait::async_trait;
use log::{debug, error, warn};
use crate::{
consensus::{Tx, ValidatorState, ValidatorStatePtr},
consensus::{ValidatorState, ValidatorStatePtr},
net,
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
node::MemoryState,
tx::Transaction,
util::serial::serialize,
Result,
};
pub struct ProtocolTx {
tx_sub: MessageSubscription<Tx>,
tx_sub: MessageSubscription<Transaction>,
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
}
impl net::Message for Transaction {
fn name() -> &'static str {
"tx"
}
}
impl ProtocolTx {
pub async fn init(
channel: ChannelPtr,
@@ -29,9 +37,9 @@ impl ProtocolTx {
) -> Result<ProtocolBasePtr> {
debug!("Adding ProtocolTx to the protocol registry");
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<Tx>().await;
msg_subsystem.add_dispatch::<Transaction>().await;
let tx_sub = channel.subscribe_msg::<Tx>().await?;
let tx_sub = channel.subscribe_msg::<Transaction>().await?;
Ok(Arc::new(Self {
tx_sub,

View File

@@ -13,7 +13,7 @@ use rand::rngs::OsRng;
use super::{
Block, BlockInfo, BlockProposal, Metadata, Participant, ProposalChain, StreamletMetadata,
Timestamp, Tx, Vote,
Timestamp, Vote,
};
use crate::{
blockchain::Blockchain,
@@ -27,6 +27,7 @@ use crate::{
state::{state_transition, StateUpdate},
Client, MemoryState, State,
},
tx::Transaction,
util::serial::{serialize, Encodable, SerialDecodable, SerialEncodable},
Result,
};
@@ -117,7 +118,7 @@ pub struct ValidatorState {
/// Client providing wallet access
pub client: Arc<Client>,
/// Pending transactions
pub unconfirmed_txs: Vec<Tx>,
pub unconfirmed_txs: Vec<Transaction>,
/// Participating start epoch
pub participating: Option<u64>,
}
@@ -168,7 +169,7 @@ impl ValidatorState {
/// The node retrieves a transaction and appends it to the unconfirmed
/// transactions list. Additional validity rules must be defined by the
/// protocol for transactions.
pub fn append_tx(&mut self, tx: Tx) -> bool {
pub fn append_tx(&mut self, tx: Transaction) -> bool {
if self.unconfirmed_txs.contains(&tx) {
debug!("append_tx(): We already have this tx");
return false
@@ -280,7 +281,7 @@ impl ValidatorState {
/// Retrieve all unconfirmed transactions not proposed in previous blocks
/// of provided index chain.
pub fn unproposed_txs(&self, index: i64) -> Vec<Tx> {
pub fn unproposed_txs(&self, index: i64) -> Vec<Transaction> {
let mut unproposed_txs = self.unconfirmed_txs.clone();
// If index is -1 (canonical blockchain) a new fork will be generated,
@@ -610,7 +611,7 @@ impl ValidatorState {
}
/// Remove provided transactions vector from unconfirmed_txs if they exist.
pub fn remove_txs(&mut self, transactions: Vec<Tx>) -> Result<()> {
pub fn remove_txs(&mut self, transactions: Vec<Transaction>) -> Result<()> {
for tx in transactions {
if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| *txs == tx) {
self.unconfirmed_txs.remove(pos);
@@ -664,7 +665,7 @@ impl ValidatorState {
chain.proposals.drain(0..(consecutive - 1));
info!(target: "consensus", "Adding finalized block to canonical chain");
info!(target: "consensus", "Adding {} finalized block to canonical chain", finalized.len());
let blockhashes = match self.blockchain.add(&finalized) {
Ok(v) => v,
Err(e) => {
@@ -676,6 +677,7 @@ impl ValidatorState {
for proposal in &finalized {
// TODO: Is this the right place? We're already doing this in protocol_sync.
// TODO: These state transitions have already been checked.
debug!(target: "consensus", "Applying state transition for finalized block");
let canon_state_clone = self.state_machine.lock().await.clone();
let mem_st = MemoryState::new(canon_state_clone);
let state_updates = ValidatorState::validate_state_transitions(mem_st, &proposal.txs)?;
@@ -839,12 +841,15 @@ impl ValidatorState {
/// Validate state transitions for given transactions and state and
/// return a vector of [`StateUpdate`]
pub fn validate_state_transitions(state: MemoryState, txs: &[Tx]) -> Result<Vec<StateUpdate>> {
pub fn validate_state_transitions(
state: MemoryState,
txs: &[Transaction],
) -> Result<Vec<StateUpdate>> {
let mut ret = vec![];
let mut st = state;
for (i, tx) in txs.iter().enumerate() {
let update = match state_transition(&st, tx.0.clone()) {
let update = match state_transition(&st, tx.clone()) {
Ok(v) => v,
Err(e) => {
warn!("validate_state_transition(): Failed for tx {}: {}", i, e);

View File

@@ -1,20 +0,0 @@
use std::io;
use crate::{
impl_vec, net,
tx::Transaction,
util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt},
Result,
};
/// Temporary transaction representation.
#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)]
pub struct Tx(pub Transaction);
impl net::Message for Tx {
fn name() -> &'static str {
"tx"
}
}
impl_vec!(Tx);