From 0cc61ccaa91d6ae7251ac89e9da8d8aedbbb60f1 Mon Sep 17 00:00:00 2001 From: aggstam Date: Fri, 21 Jul 2023 19:20:50 +0300 Subject: [PATCH] validator/proto: BlockInfo protocol added --- bin/darkfid2/src/main.rs | 13 ++- src/validator/mod.rs | 16 ++++ src/validator/proto/mod.rs | 4 + src/validator/proto/protocol_block.rs | 124 ++++++++++++++++++++++++++ src/validator/proto/protocol_tx.rs | 2 +- 5 files changed, 157 insertions(+), 2 deletions(-) create mode 100644 src/validator/proto/protocol_block.rs diff --git a/bin/darkfid2/src/main.rs b/bin/darkfid2/src/main.rs index 18cdc1965..9d7f7eb31 100644 --- a/bin/darkfid2/src/main.rs +++ b/bin/darkfid2/src/main.rs @@ -28,7 +28,10 @@ use darkfi::{ net::{settings::SettingsOpt, P2p, P2pPtr, SESSION_ALL}, rpc::server::listen_and_serve, util::time::TimeKeeper, - validator::{proto::ProtocolTx, Validator, ValidatorConfig, ValidatorPtr}, + validator::{ + proto::{ProtocolBlock, ProtocolTx}, + Validator, ValidatorConfig, ValidatorPtr, + }, Result, }; use darkfi_contract_test_harness::vks; @@ -141,6 +144,14 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { let p2p = P2p::new(args.sync_net.into()).await; let registry = p2p.protocol_registry(); + let _validator = validator.clone(); + registry + .register(SESSION_ALL, move |channel, p2p| { + let validator = _validator.clone(); + async move { ProtocolBlock::init(channel, validator, p2p).await.unwrap() } + }) + .await; + let _validator = validator.clone(); registry .register(SESSION_ALL, move |channel, p2p| { diff --git a/src/validator/mod.rs b/src/validator/mod.rs index a389b6fa6..783c9ea35 100644 --- a/src/validator/mod.rs +++ b/src/validator/mod.rs @@ -161,6 +161,22 @@ impl Validator { Ok(()) } + /// The node retrieves a block and tries to add it if it doesn't + /// already exists. + pub async fn append_block(&mut self, block: &BlockInfo) -> Result<()> { + let block_hash = block.blockhash().to_string(); + + // Check if block already exists + if self.blockchain.has_block(block)? { + debug!(target: "validator::append_block", "We have already seen this block"); + return Err(Error::BlockAlreadyExists(block_hash)) + } + + self.add_blocks(&[block.clone()]).await?; + info!(target: "validator::append_block", "Block added: {}", block_hash); + Ok(()) + } + // ========================== // State transition functions // ========================== diff --git a/src/validator/proto/mod.rs b/src/validator/proto/mod.rs index aa87ba10e..949188265 100644 --- a/src/validator/proto/mod.rs +++ b/src/validator/proto/mod.rs @@ -16,6 +16,10 @@ * along with this program. If not, see . */ +/// Block broadcast protocol +mod protocol_block; +pub use protocol_block::ProtocolBlock; + /// Transaction broadcast protocol mod protocol_tx; pub use protocol_tx::ProtocolTx; diff --git a/src/validator/proto/protocol_block.rs b/src/validator/proto/protocol_block.rs new file mode 100644 index 000000000..6004023e7 --- /dev/null +++ b/src/validator/proto/protocol_block.rs @@ -0,0 +1,124 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2023 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use async_std::sync::Arc; +use async_trait::async_trait; +use log::debug; +use smol::Executor; +use url::Url; + +use crate::{ + blockchain::BlockInfo, + impl_p2p_message, + net::{ + ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, + ProtocolJobsManager, ProtocolJobsManagerPtr, + }, + validator::ValidatorPtr, + Result, +}; + +pub struct ProtocolBlock { + block_sub: MessageSubscription, + jobsman: ProtocolJobsManagerPtr, + validator: ValidatorPtr, + p2p: P2pPtr, + channel_address: Url, +} + +impl_p2p_message!(BlockInfo, "block"); + +impl ProtocolBlock { + pub async fn init( + channel: ChannelPtr, + validator: ValidatorPtr, + p2p: P2pPtr, + ) -> Result { + debug!( + target: "validator::protocol_block::init", + "Adding ProtocolTx to the protocol registry" + ); + let msg_subsystem = channel.message_subsystem(); + msg_subsystem.add_dispatch::().await; + + let block_sub = channel.subscribe_msg::().await?; + + Ok(Arc::new(Self { + block_sub, + jobsman: ProtocolJobsManager::new("BlockProtocol", channel.clone()), + validator, + p2p, + channel_address: channel.address().clone(), + })) + } + + async fn handle_receive_block(self: Arc) -> Result<()> { + debug!(target: "consensus::protocol_block::handle_receive_block", "START"); + let exclude_list = vec![self.channel_address.clone()]; + loop { + let block = match self.block_sub.receive().await { + Ok(v) => v, + Err(e) => { + debug!( + target: "validator::protocol_block::handle_receive_block", + "recv fail: {}", + e + ); + continue + } + }; + + // Check if node has finished syncing its blockchain + if !self.validator.read().await.synced { + debug!( + target: "validator::protocol_block::handle_receive_block", + "Node still syncing blockchain, skipping..." + ); + continue + } + + let block_copy = (*block).clone(); + + match self.validator.write().await.append_block(&block_copy).await { + Ok(()) => self.p2p.broadcast_with_exclude(&block_copy, &exclude_list).await, + Err(e) => { + debug!( + target: "validator::protocol_block::handle_receive_block", + "append_block fail: {}", + e + ); + } + }; + } + } +} + +#[async_trait] +impl ProtocolBase for ProtocolBlock { + async fn start(self: Arc, executor: Arc>) -> Result<()> { + debug!(target: "validator::protocol_block::start", "START"); + self.jobsman.clone().start(executor.clone()); + self.jobsman.clone().spawn(self.clone().handle_receive_block(), executor.clone()).await; + debug!(target: "validator::protocol_block::start", "END"); + Ok(()) + } + + fn name(&self) -> &'static str { + "ProtocolBlock" + } +} diff --git a/src/validator/proto/protocol_tx.rs b/src/validator/proto/protocol_tx.rs index ede0e0db2..f5d62e0df 100644 --- a/src/validator/proto/protocol_tx.rs +++ b/src/validator/proto/protocol_tx.rs @@ -103,7 +103,7 @@ impl ProtocolTx { Err(e) => { debug!( target: "validator::protocol_tx::handle_receive_tx", - "append_tc fail: {}", + "append_tx fail: {}", e ); }