diff --git a/bin/darkfid2/src/main.rs b/bin/darkfid2/src/main.rs index 65675dcdc..70aacbeb3 100644 --- a/bin/darkfid2/src/main.rs +++ b/bin/darkfid2/src/main.rs @@ -142,7 +142,7 @@ struct Args { pub struct Darkfid { synced: Mutex, // AtomicBool is weird in Arc - consensus_p2p: Option, + _consensus_p2p: Option, sync_p2p: Option, client: Arc, validator_state: ValidatorStatePtr, @@ -190,7 +190,13 @@ impl Darkfid { let client = validator_state.read().await.client.clone(); debug!("Released validator state lock"); - Ok(Self { synced: Mutex::new(false), consensus_p2p, sync_p2p, client, validator_state }) + Ok(Self { + synced: Mutex::new(false), + _consensus_p2p: consensus_p2p, + sync_p2p, + client, + validator_state, + }) } } diff --git a/bin/darkfid2/src/rpc_tx.rs b/bin/darkfid2/src/rpc_tx.rs index 8c4680a77..cf9f25d5a 100644 --- a/bin/darkfid2/src/rpc_tx.rs +++ b/bin/darkfid2/src/rpc_tx.rs @@ -4,7 +4,6 @@ use log::{error, warn}; use serde_json::{json, Value}; use darkfi::{ - consensus::Tx, crypto::{address::Address, keypair::PublicKey, token_id::generate_id}, rpc::{ jsonrpc, @@ -117,7 +116,7 @@ impl Darkfid { }; if let Some(sync_p2p) = &self.sync_p2p { - match sync_p2p.broadcast(Tx(tx.clone())).await { + match sync_p2p.broadcast(tx.clone()).await { Ok(()) => {} Err(e) => { error!("transfer(): Failed broadcasting transaction: {}", e); diff --git a/bin/drk/src/main.rs b/bin/drk/src/main.rs index 54564933c..3a2b8c5b2 100644 --- a/bin/drk/src/main.rs +++ b/bin/drk/src/main.rs @@ -48,7 +48,7 @@ enum DrkSubcommand { #[clap(long)] /// JSON-RPC endpoint of the faucet - endpoint: Url, + faucet_endpoint: Url, /// f64 amount requested for airdrop amount: f64, @@ -269,8 +269,8 @@ async fn main() -> Result<()> { match args.command { DrkSubcommand::Ping => drk.ping().await, - DrkSubcommand::Airdrop { address, endpoint, amount } => { - drk.airdrop(address, endpoint, amount).await + DrkSubcommand::Airdrop { address, faucet_endpoint, amount } => { + drk.airdrop(address, faucet_endpoint, amount).await } DrkSubcommand::Wallet { keygen, balance, address, all_addresses } => { diff --git a/bin/faucetd/src/main.rs b/bin/faucetd/src/main.rs index f792095ac..60b601e35 100644 --- a/bin/faucetd/src/main.rs +++ b/bin/faucetd/src/main.rs @@ -20,7 +20,7 @@ use darkfi::{ consensus::{ proto::{ProtocolSync, ProtocolTx}, task::block_sync_task, - Timestamp, Tx, ValidatorState, ValidatorStatePtr, MAINNET_GENESIS_HASH_BYTES, + Timestamp, ValidatorState, ValidatorStatePtr, MAINNET_GENESIS_HASH_BYTES, TESTNET_GENESIS_HASH_BYTES, }, crypto::{address::Address, keypair::PublicKey, token_list::DrkTokenList}, @@ -257,7 +257,7 @@ impl Faucetd { }; // Broadcast transaction to the network. - match self.sync_p2p.broadcast(Tx(tx.clone())).await { + match self.sync_p2p.broadcast(tx.clone()).await { Ok(()) => {} Err(e) => { error!("airdrop(): Failed broadcasting transaction: {}", e); diff --git a/src/blockchain/txstore.rs b/src/blockchain/txstore.rs index e9c07fe0f..6300e53e9 100644 --- a/src/blockchain/txstore.rs +++ b/src/blockchain/txstore.rs @@ -1,7 +1,7 @@ use sled::Batch; use crate::{ - consensus::Tx, + tx::Transaction, util::serial::{deserialize, serialize}, Error, Result, }; @@ -17,11 +17,11 @@ impl TxStore { Ok(Self(tree)) } - /// Insert a slice of [`Tx`] into the txstore. With sled, the + /// Insert a slice of [`Transaction`] into the txstore. With sled, the /// operation is done as a batch. /// The transactions are hashed with BLAKE3 and this hash is /// used as the key, while value is the serialized tx itself. - pub fn insert(&self, txs: &[Tx]) -> Result> { + pub fn insert(&self, txs: &[Transaction]) -> Result> { let mut ret = Vec::with_capacity(txs.len()); let mut batch = Batch::default(); for i in txs { @@ -42,8 +42,12 @@ impl TxStore { /// Fetch requested transactions from the txstore. The `strict` param /// will make the function fail if a transaction has not been found. - pub fn get(&self, tx_hashes: &[blake3::Hash], strict: bool) -> Result>> { - let mut ret: Vec> = Vec::with_capacity(tx_hashes.len()); + pub fn get( + &self, + tx_hashes: &[blake3::Hash], + strict: bool, + ) -> Result>> { + let mut ret: Vec> = Vec::with_capacity(tx_hashes.len()); for i in tx_hashes { if let Some(found) = self.0.get(i.as_bytes())? { @@ -63,7 +67,7 @@ impl TxStore { /// Retrieve all transactions. /// Be careful as this will try to load everything in memory. - pub fn get_all(&self) -> Result>> { + pub fn get_all(&self) -> Result>> { let mut txs = vec![]; let iterator = self.0.into_iter().enumerate(); for (_, r) in iterator { diff --git a/src/consensus/block.rs b/src/consensus/block.rs index b707761a7..62e5b02b5 100644 --- a/src/consensus/block.rs +++ b/src/consensus/block.rs @@ -2,10 +2,11 @@ use std::io; use log::debug; -use super::{Metadata, StreamletMetadata, Timestamp, Tx}; +use super::{Metadata, StreamletMetadata, Timestamp}; use crate::{ crypto::{address::Address, keypair::PublicKey, schnorr::Signature}, impl_vec, net, + tx::Transaction, util::serial::{serialize, Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt}, Result, }; @@ -62,7 +63,7 @@ pub struct BlockInfo { /// Slot UID, generated by the beacon pub sl: u64, /// Transactions payload - pub txs: Vec, + pub txs: Vec, /// Additional proposal information pub metadata: Metadata, /// Proposal information used by Streamlet consensus @@ -73,7 +74,7 @@ impl BlockInfo { pub fn new( st: blake3::Hash, sl: u64, - txs: Vec, + txs: Vec, metadata: Metadata, sm: StreamletMetadata, ) -> Self { @@ -123,7 +124,7 @@ impl BlockProposal { address: Address, st: blake3::Hash, sl: u64, - txs: Vec, + txs: Vec, metadata: Metadata, sm: StreamletMetadata, ) -> Self { @@ -140,7 +141,7 @@ impl BlockProposal { pub fn to_proposal_hash( st: blake3::Hash, sl: u64, - transactions: &[Tx], + transactions: &[Transaction], metadata: &Metadata, ) -> blake3::Hash { let mut txs = Vec::with_capacity(transactions.len()); diff --git a/src/consensus/mod.rs b/src/consensus/mod.rs index 3a689f5d1..9a7f9e9fd 100644 --- a/src/consensus/mod.rs +++ b/src/consensus/mod.rs @@ -2,10 +2,6 @@ pub mod block; pub use block::{Block, BlockInfo, BlockProposal, ProposalChain}; -/// Transactions -pub mod tx; -pub use tx::Tx; - /// Consensus metadata pub mod metadata; pub use metadata::{Metadata, StreamletMetadata}; diff --git a/src/consensus/proto/protocol_tx.rs b/src/consensus/proto/protocol_tx.rs index eee6e46dc..e01d9f824 100644 --- a/src/consensus/proto/protocol_tx.rs +++ b/src/consensus/proto/protocol_tx.rs @@ -4,23 +4,31 @@ use async_trait::async_trait; use log::{debug, error, warn}; use crate::{ - consensus::{Tx, ValidatorState, ValidatorStatePtr}, + consensus::{ValidatorState, ValidatorStatePtr}, + net, net::{ ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, ProtocolJobsManagerPtr, }, node::MemoryState, + tx::Transaction, util::serial::serialize, Result, }; pub struct ProtocolTx { - tx_sub: MessageSubscription, + tx_sub: MessageSubscription, jobsman: ProtocolJobsManagerPtr, state: ValidatorStatePtr, p2p: P2pPtr, } +impl net::Message for Transaction { + fn name() -> &'static str { + "tx" + } +} + impl ProtocolTx { pub async fn init( channel: ChannelPtr, @@ -29,9 +37,9 @@ impl ProtocolTx { ) -> Result { debug!("Adding ProtocolTx to the protocol registry"); let msg_subsystem = channel.get_message_subsystem(); - msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; - let tx_sub = channel.subscribe_msg::().await?; + let tx_sub = channel.subscribe_msg::().await?; Ok(Arc::new(Self { tx_sub, diff --git a/src/consensus/state.rs b/src/consensus/state.rs index 1723cf2e0..97c8989e5 100644 --- a/src/consensus/state.rs +++ b/src/consensus/state.rs @@ -13,7 +13,7 @@ use rand::rngs::OsRng; use super::{ Block, BlockInfo, BlockProposal, Metadata, Participant, ProposalChain, StreamletMetadata, - Timestamp, Tx, Vote, + Timestamp, Vote, }; use crate::{ blockchain::Blockchain, @@ -27,6 +27,7 @@ use crate::{ state::{state_transition, StateUpdate}, Client, MemoryState, State, }, + tx::Transaction, util::serial::{serialize, Encodable, SerialDecodable, SerialEncodable}, Result, }; @@ -117,7 +118,7 @@ pub struct ValidatorState { /// Client providing wallet access pub client: Arc, /// Pending transactions - pub unconfirmed_txs: Vec, + pub unconfirmed_txs: Vec, /// Participating start epoch pub participating: Option, } @@ -168,7 +169,7 @@ impl ValidatorState { /// The node retrieves a transaction and appends it to the unconfirmed /// transactions list. Additional validity rules must be defined by the /// protocol for transactions. - pub fn append_tx(&mut self, tx: Tx) -> bool { + pub fn append_tx(&mut self, tx: Transaction) -> bool { if self.unconfirmed_txs.contains(&tx) { debug!("append_tx(): We already have this tx"); return false @@ -280,7 +281,7 @@ impl ValidatorState { /// Retrieve all unconfirmed transactions not proposed in previous blocks /// of provided index chain. - pub fn unproposed_txs(&self, index: i64) -> Vec { + pub fn unproposed_txs(&self, index: i64) -> Vec { let mut unproposed_txs = self.unconfirmed_txs.clone(); // If index is -1 (canonical blockchain) a new fork will be generated, @@ -610,7 +611,7 @@ impl ValidatorState { } /// Remove provided transactions vector from unconfirmed_txs if they exist. - pub fn remove_txs(&mut self, transactions: Vec) -> Result<()> { + pub fn remove_txs(&mut self, transactions: Vec) -> Result<()> { for tx in transactions { if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| *txs == tx) { self.unconfirmed_txs.remove(pos); @@ -664,7 +665,7 @@ impl ValidatorState { chain.proposals.drain(0..(consecutive - 1)); - info!(target: "consensus", "Adding finalized block to canonical chain"); + info!(target: "consensus", "Adding {} finalized block to canonical chain", finalized.len()); let blockhashes = match self.blockchain.add(&finalized) { Ok(v) => v, Err(e) => { @@ -676,6 +677,7 @@ impl ValidatorState { for proposal in &finalized { // TODO: Is this the right place? We're already doing this in protocol_sync. // TODO: These state transitions have already been checked. + debug!(target: "consensus", "Applying state transition for finalized block"); let canon_state_clone = self.state_machine.lock().await.clone(); let mem_st = MemoryState::new(canon_state_clone); let state_updates = ValidatorState::validate_state_transitions(mem_st, &proposal.txs)?; @@ -839,12 +841,15 @@ impl ValidatorState { /// Validate state transitions for given transactions and state and /// return a vector of [`StateUpdate`] - pub fn validate_state_transitions(state: MemoryState, txs: &[Tx]) -> Result> { + pub fn validate_state_transitions( + state: MemoryState, + txs: &[Transaction], + ) -> Result> { let mut ret = vec![]; let mut st = state; for (i, tx) in txs.iter().enumerate() { - let update = match state_transition(&st, tx.0.clone()) { + let update = match state_transition(&st, tx.clone()) { Ok(v) => v, Err(e) => { warn!("validate_state_transition(): Failed for tx {}: {}", i, e); diff --git a/src/consensus/tx.rs b/src/consensus/tx.rs deleted file mode 100644 index 8c6cd17b2..000000000 --- a/src/consensus/tx.rs +++ /dev/null @@ -1,20 +0,0 @@ -use std::io; - -use crate::{ - impl_vec, net, - tx::Transaction, - util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt}, - Result, -}; - -/// Temporary transaction representation. -#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)] -pub struct Tx(pub Transaction); - -impl net::Message for Tx { - fn name() -> &'static str { - "tx" - } -} - -impl_vec!(Tx);