darkfid2: moved p2p protos from validator to darkfid using wrapper structs for messages

This commit is contained in:
aggstam
2023-08-02 18:37:43 +03:00
parent 459170ff09
commit e71264d376
14 changed files with 67 additions and 59 deletions

View File

@@ -48,6 +48,9 @@ mod rpc_tx;
mod task;
use task::sync::sync_task;
/// P2P net protocols
mod proto;
/// Utility functions
mod utils;
use utils::{genesis_txs_total, spawn_consensus_p2p, spawn_sync_p2p};

View File

@@ -18,7 +18,7 @@
/// Block broadcast protocol
mod protocol_block;
pub use protocol_block::ProtocolBlock;
pub use protocol_block::{BlockInfoMessage, ProtocolBlock};
/// Block proposal broadcast protocol
mod protocol_proposal;

View File

@@ -22,7 +22,7 @@ use log::debug;
use smol::Executor;
use url::Url;
use crate::{
use darkfi::{
blockchain::BlockInfo,
impl_p2p_message,
net::{
@@ -32,11 +32,22 @@ use crate::{
validator::ValidatorPtr,
Result,
};
use darkfi_serial::{SerialDecodable, SerialEncodable};
impl_p2p_message!(BlockInfo, "block");
/// Auxiliary [`BlockInfo`] wrapper structure used for messaging.
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct BlockInfoMessage(BlockInfo);
impl From<&BlockInfo> for BlockInfoMessage {
fn from(block: &BlockInfo) -> Self {
BlockInfoMessage(block.clone())
}
}
impl_p2p_message!(BlockInfoMessage, "block");
pub struct ProtocolBlock {
block_sub: MessageSubscription<BlockInfo>,
block_sub: MessageSubscription<BlockInfoMessage>,
jobsman: ProtocolJobsManagerPtr,
validator: ValidatorPtr,
p2p: P2pPtr,
@@ -54,9 +65,9 @@ impl ProtocolBlock {
"Adding ProtocolBlock to the protocol registry"
);
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<BlockInfo>().await;
msg_subsystem.add_dispatch::<BlockInfoMessage>().await;
let block_sub = channel.subscribe_msg::<BlockInfo>().await?;
let block_sub = channel.subscribe_msg::<BlockInfoMessage>().await?;
Ok(Arc::new(Self {
block_sub,
@@ -106,7 +117,7 @@ impl ProtocolBlock {
let block_copy = (*block).clone();
match self.validator.write().await.append_block(&block_copy).await {
match self.validator.write().await.append_block(&block_copy.0).await {
Ok(()) => self.p2p.broadcast_with_exclude(&block_copy, &exclude_list).await,
Err(e) => {
debug!(

View File

@@ -22,7 +22,7 @@ use log::debug;
use smol::Executor;
use url::Url;
use crate::{
use darkfi::{
impl_p2p_message,
net::{
ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
@@ -31,11 +31,16 @@ use crate::{
validator::{consensus::Proposal, ValidatorPtr},
Result,
};
use darkfi_serial::{SerialDecodable, SerialEncodable};
impl_p2p_message!(Proposal, "proposal");
/// Auxiliary [`Proposal`] wrapper structure used for messaging.
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
struct ProposalMessage(Proposal);
impl_p2p_message!(ProposalMessage, "proposal");
pub struct ProtocolProposal {
proposal_sub: MessageSubscription<Proposal>,
proposal_sub: MessageSubscription<ProposalMessage>,
jobsman: ProtocolJobsManagerPtr,
validator: ValidatorPtr,
p2p: P2pPtr,
@@ -53,9 +58,9 @@ impl ProtocolProposal {
"Adding ProtocolProposal to the protocol registry"
);
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<Proposal>().await;
msg_subsystem.add_dispatch::<ProposalMessage>().await;
let proposal_sub = channel.subscribe_msg::<Proposal>().await?;
let proposal_sub = channel.subscribe_msg::<ProposalMessage>().await?;
Ok(Arc::new(Self {
proposal_sub,
@@ -102,7 +107,7 @@ impl ProtocolProposal {
let proposal_copy = (*proposal).clone();
match self.validator.write().await.consensus.append_proposal(&proposal_copy).await {
match self.validator.write().await.consensus.append_proposal(&proposal_copy.0).await {
Ok(()) => self.p2p.broadcast_with_exclude(&proposal_copy, &exclude_list).await,
Err(e) => {
debug!(

View File

@@ -21,9 +21,7 @@ use async_trait::async_trait;
use log::{debug, error};
use smol::Executor;
use darkfi_serial::{SerialDecodable, SerialEncodable};
use crate::{
use darkfi::{
blockchain::BlockInfo,
impl_p2p_message,
net::{
@@ -33,6 +31,7 @@ use crate::{
validator::ValidatorPtr,
Result,
};
use darkfi_serial::{SerialDecodable, SerialEncodable};
// Constant defining how many blocks we send during syncing.
const BATCH: u64 = 10;

View File

@@ -22,7 +22,7 @@ use log::debug;
use smol::Executor;
use url::Url;
use crate::{
use darkfi::{
impl_p2p_message,
net::{
ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
@@ -32,11 +32,16 @@ use crate::{
validator::ValidatorPtr,
Result,
};
use darkfi_serial::{SerialDecodable, SerialEncodable};
impl_p2p_message!(Transaction, "tx");
/// Auxiliary [`Transaction`] wrapper structure used for messaging.
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
struct TransactionMessage(Transaction);
impl_p2p_message!(TransactionMessage, "tx");
pub struct ProtocolTx {
tx_sub: MessageSubscription<Transaction>,
tx_sub: MessageSubscription<TransactionMessage>,
jobsman: ProtocolJobsManagerPtr,
validator: ValidatorPtr,
p2p: P2pPtr,
@@ -54,9 +59,9 @@ impl ProtocolTx {
"Adding ProtocolTx to the protocol registry"
);
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<Transaction>().await;
msg_subsystem.add_dispatch::<TransactionMessage>().await;
let tx_sub = channel.subscribe_msg::<Transaction>().await?;
let tx_sub = channel.subscribe_msg::<TransactionMessage>().await?;
Ok(Arc::new(Self {
tx_sub,
@@ -98,7 +103,7 @@ impl ProtocolTx {
let tx_copy = (*tx).clone();
// Nodes use unconfirmed_txs vector as seen_txs pool.
match self.validator.write().await.append_tx(tx_copy.clone()).await {
match self.validator.write().await.append_tx(&tx_copy.0).await {
Ok(()) => self.p2p.broadcast_with_exclude(&tx_copy, &exclude_list).await,
Err(e) => {
debug!(

View File

@@ -120,7 +120,7 @@ impl Darkfid {
// Consensus participants can directly perform
// the state transition check and append to their
// pending transactions store.
if self.validator.write().await.append_tx(tx.clone()).await.is_err() {
if self.validator.write().await.append_tx(&tx).await.is_err() {
error!(target: "darkfid::rpc::tx_broadcast", "Failed to append transaction to mempool");
return server_error(RpcError::TxSimulationFail, id, None)
}

View File

@@ -16,14 +16,13 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use darkfi::{
util::async_util::sleep,
validator::proto::{SyncRequest, SyncResponse},
Result,
};
use darkfi::{util::async_util::sleep, Result};
use log::{debug, info, warn};
use crate::Darkfid;
use crate::{
proto::{SyncRequest, SyncResponse},
Darkfid,
};
/// async task used for block syncing
pub async fn sync_task(node: &Darkfid) -> Result<()> {

View File

@@ -36,6 +36,7 @@ use log::error;
use url::Url;
use crate::{
proto::BlockInfoMessage,
task::sync::sync_task,
utils::{genesis_txs_total, spawn_consensus_p2p, spawn_sync_p2p},
Darkfid,
@@ -147,7 +148,7 @@ impl Harness {
pub async fn add_blocks(&self, blocks: &[BlockInfo]) -> Result<()> {
// We simply broadcast the block using Alice's sync P2P
for block in blocks {
self.alice.sync_p2p.broadcast(block).await;
self.alice.sync_p2p.broadcast(&BlockInfoMessage::from(block)).await;
}
// and then add it to her chain

View File

@@ -22,10 +22,7 @@ use darkfi::{
error::TxVerifyFailed,
net::{P2p, P2pPtr, Settings, SESSION_ALL},
tx::Transaction,
validator::{
proto::{ProtocolBlock, ProtocolProposal, ProtocolSync, ProtocolTx},
ValidatorPtr,
},
validator::ValidatorPtr,
Result,
};
use darkfi_consensus_contract::{
@@ -35,6 +32,8 @@ use darkfi_money_contract::{model::MoneyTokenMintParamsV1, MoneyFunction::Genesi
use darkfi_sdk::crypto::{CONSENSUS_CONTRACT_ID, MONEY_CONTRACT_ID};
use darkfi_serial::deserialize;
use crate::proto::{ProtocolBlock, ProtocolProposal, ProtocolSync, ProtocolTx};
/// Auxiliary function to calculate the total amount of minted tokens in provided
/// genesis transactions set. This includes both staked and normal tokens.
/// If a non-genesis transaction is found, execution fails.

View File

@@ -24,14 +24,17 @@ use url::Url;
use crate::{
consensus::ValidatorStatePtr,
impl_p2p_message,
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
tx::Transaction,
Result,
};
impl_p2p_message!(Transaction, "tx");
pub struct ProtocolTx {
tx_sub: MessageSubscription<Transaction>,
jobsman: ProtocolJobsManagerPtr,

View File

@@ -72,7 +72,7 @@ macro_rules! cli_desc {
"{} {}-{}\n{}\n{}",
env!("CARGO_PKG_NAME").to_string(),
env!("CARGO_PKG_VERSION").to_string(),
"@GIT_REV@", // Only noobs will change this
"459170ff096", // Only noobs will change this
env!("CARGO_PKG_DESCRIPTION").to_string(),
darkfi::ANSI_LOGO,
);

View File

@@ -70,9 +70,8 @@ impl Consensus {
/// 1. Node has not started current slot finalization
/// 2. Proposal refers to current slot
/// 3. Proposal hash matches the actual block one
/// 4. Header hash matches the actual one
/// 5. Block transactions don't exceed set limit
/// 6. Block is valid
/// 4. Block transactions don't exceed set limit
/// 5. Block is valid
/// Additional validity rules can be applied.
pub async fn append_proposal(&mut self, proposal: &Proposal) -> Result<()> {
// Generate a time keeper for current slot
@@ -102,16 +101,6 @@ impl Consensus {
return Err(Error::ProposalHashesMissmatchError)
}
// Check if proposal header matches actual one
let proposal_header = hdr.headerhash();
if proposal.header != proposal_header {
warn!(
target: "validator::consensus::append_proposal", "Received proposal contains mismatched headers: {} - {}",
proposal.header, proposal_header
);
return Err(Error::ProposalHeadersMissmatchError)
}
// TODO: verify if this should happen here or not.
// Check that proposal transactions don't exceed limit
if proposal.block.txs.len() > TXS_CAP {
@@ -257,8 +246,6 @@ impl Consensus {
pub struct Proposal {
/// Block hash
pub hash: blake3::Hash,
/// Block header hash
pub header: blake3::Hash,
/// Block data
pub block: BlockInfo,
}
@@ -266,8 +253,7 @@ pub struct Proposal {
impl Proposal {
pub fn new(block: BlockInfo) -> Self {
let hash = block.blockhash();
let header = block.header.headerhash();
Self { hash, header, block }
Self { hash, block }
}
}

View File

@@ -37,9 +37,6 @@ use consensus::{next_block_reward, Consensus};
pub mod verification;
use verification::{verify_block, verify_genesis_block, verify_transactions};
/// P2P net protocols
pub mod proto;
/// Helper utilities
pub mod utils;
use utils::deploy_native_contracts;
@@ -128,8 +125,8 @@ impl Validator {
/// The node retrieves a transaction, validates its state transition,
/// and appends it to the pending txs store.
pub async fn append_tx(&mut self, tx: Transaction) -> Result<()> {
let tx_hash = blake3::hash(&serialize(&tx));
pub async fn append_tx(&mut self, tx: &Transaction) -> Result<()> {
let tx_hash = blake3::hash(&serialize(tx));
// Check if we have already seen this tx
let tx_in_txstore = self.blockchain.transactions.contains(&tx_hash)?;
@@ -142,7 +139,7 @@ impl Validator {
// Verify state transition
info!(target: "validator::append_tx", "Starting state transition validation");
let tx_vec = [tx];
let tx_vec = [tx.clone()];
let mut valid = false;
// Generate a time keeper for current slot