diff --git a/bin/darkfid/src/main.rs b/bin/darkfid/src/main.rs index 0f5055774..0715aafd6 100644 --- a/bin/darkfid/src/main.rs +++ b/bin/darkfid/src/main.rs @@ -30,7 +30,7 @@ use darkfi::{ async_daemonize, blockchain::BlockInfo, cli_desc, - net::{settings::SettingsOpt, P2pPtr}, + net::settings::SettingsOpt, rpc::{ client::RpcChadClient, jsonrpc::JsonSubscriber, @@ -60,10 +60,11 @@ use task::{consensus::ConsensusInitTaskConfig, consensus_init_task}; /// P2P net protocols mod proto; +use proto::{DarkfidP2pHandler, DarkfidP2pHandlerPtr}; /// Utility functions mod utils; -use utils::{parse_blockchain_config, spawn_p2p}; +use utils::parse_blockchain_config; const CONFIG_FILE: &str = "darkfid_config.toml"; const CONFIG_FILE_CONTENTS: &str = include_str!("../darkfid_config.toml"); @@ -186,8 +187,8 @@ impl MinerRpcCLient { /// Daemon structure pub struct Darkfid { - /// P2P network pointer - p2p: P2pPtr, + /// P2P network protocols handler. + p2p_handler: DarkfidP2pHandlerPtr, /// Validator(node) pointer validator: ValidatorPtr, /// Flag to specify node is a miner @@ -206,7 +207,7 @@ pub struct Darkfid { impl Darkfid { pub async fn new( - p2p: P2pPtr, + p2p_handler: DarkfidP2pHandlerPtr, validator: ValidatorPtr, miner: bool, txs_batch_size: usize, @@ -215,7 +216,7 @@ impl Darkfid { dnet_sub: JsonSubscriber, ) -> Self { Self { - p2p, + p2p_handler, validator, miner, txs_batch_size, @@ -288,8 +289,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals")); // Initialize P2P network - let p2p = - spawn_p2p(&blockchain_config.net.into(), &validator, &subscribers, ex.clone()).await?; + let p2p_handler = DarkfidP2pHandler::init(&blockchain_config.net.into(), &ex).await?; // Initialize JSON-RPC client to perform requests to minerd let rpc_client = if blockchain_config.miner { @@ -319,7 +319,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { info!(target: "darkfid", "Starting dnet subs task"); let dnet_sub = JsonSubscriber::new("dnet.subscribe_events"); let dnet_sub_ = dnet_sub.clone(); - let p2p_ = p2p.clone(); + let p2p_ = p2p_handler.p2p.clone(); let dnet_task = StoppableTask::new(); dnet_task.clone().start( async move { @@ -342,11 +342,11 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { // Initialize node let darkfid = Darkfid::new( - p2p.clone(), - validator, + p2p_handler.clone(), + validator.clone(), blockchain_config.miner, txs_batch_size, - subscribers, + subscribers.clone(), rpc_client, dnet_sub, ) @@ -383,7 +383,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { ); info!(target: "darkfid", "Starting P2P network"); - p2p.clone().start().await?; + p2p_handler.clone().start(&ex, &validator, &subscribers).await?; // Consensus protocol info!(target: "darkfid", "Starting consensus protocol task"); @@ -424,8 +424,8 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { info!(target: "darkfid", "Stopping JSON-RPC server..."); rpc_task.stop().await; - info!(target: "darkfid", "Stopping P2P network..."); - p2p.stop().await; + info!(target: "darkfid", "Stopping P2P network protocols handler..."); + p2p_handler.stop().await; info!(target: "darkfid", "Stopping consensus task..."); consensus_task.stop().await; diff --git a/bin/darkfid/src/proto/mod.rs b/bin/darkfid/src/proto/mod.rs index dd03c15ff..dd6694a1a 100644 --- a/bin/darkfid/src/proto/mod.rs +++ b/bin/darkfid/src/proto/mod.rs @@ -16,19 +16,130 @@ * along with this program. If not, see . */ +use std::{collections::HashMap, sync::Arc}; + +use darkfi::{ + net::{P2p, P2pPtr, Settings}, + rpc::jsonrpc::JsonSubscriber, + system::ExecutorPtr, + validator::ValidatorPtr, + Result, +}; +use log::info; + // TODO: Protocal functions need to be protected so peers can't spam us. /// Block proposal broadcast protocol mod protocol_proposal; -pub use protocol_proposal::{ProposalMessage, ProtocolProposal}; +pub use protocol_proposal::{ProposalMessage, ProtocolProposalHandler, ProtocolProposalHandlerPtr}; /// Validator blockchain sync protocol mod protocol_sync; pub use protocol_sync::{ - ForkSyncRequest, ForkSyncResponse, HeaderSyncRequest, HeaderSyncResponse, ProtocolSync, - SyncRequest, SyncResponse, TipRequest, TipResponse, BATCH, + ForkSyncRequest, ForkSyncResponse, HeaderSyncRequest, HeaderSyncResponse, ProtocolSyncHandler, + ProtocolSyncHandlerPtr, SyncRequest, SyncResponse, TipRequest, TipResponse, BATCH, }; /// Transaction broadcast protocol mod protocol_tx; -pub use protocol_tx::ProtocolTx; +pub use protocol_tx::{ProtocolTxHandler, ProtocolTxHandlerPtr}; + +/// Atomic pointer to the Darkfid P2P protocols handler. +pub type DarkfidP2pHandlerPtr = Arc; + +/// Darkfid P2P protocols handler. +pub struct DarkfidP2pHandler { + /// P2P network pointer + pub p2p: P2pPtr, + /// `ProtocolProposal` messages handler + proposals: ProtocolProposalHandlerPtr, + /// `ProtocolSync` messages handler + sync: ProtocolSyncHandlerPtr, + /// `ProtocolTx` messages handler + txs: ProtocolTxHandlerPtr, +} + +impl DarkfidP2pHandler { + /// Initialize a Darkfid P2P protocols handler. + /// + /// A new P2P instance is generated using provided settings and all + /// corresponding protocols are registered. + pub async fn init(settings: &Settings, executor: &ExecutorPtr) -> Result { + info!( + target: "darkfid::proto::mod::DarkfidP2pHandler::init", + "Initializing a new Darkfid P2P handler..." + ); + + // Generate a new P2P instance + let p2p = P2p::new(settings.clone(), executor.clone()).await?; + + // Generate a new `ProtocolProposal` messages handler + let proposals = ProtocolProposalHandler::init(&p2p).await; + + // Generate a new `ProtocolSync` messages handler + let sync = ProtocolSyncHandler::init(&p2p).await; + + // Generate a new `ProtocolTx` messages handler + let txs = ProtocolTxHandler::init(&p2p).await; + + info!( + target: "darkfid::proto::mod::DarkfidP2pHandler::init", + "Darkfid P2P handler generated successfully!" + ); + + Ok(Arc::new(Self { p2p, proposals, sync, txs })) + } + + /// Start the Darkfid P2P protocols handler for provided validator. + pub async fn start( + &self, + executor: &ExecutorPtr, + validator: &ValidatorPtr, + subscribers: &HashMap<&'static str, JsonSubscriber>, + ) -> Result<()> { + info!( + target: "darkfid::proto::mod::DarkfidP2pHandler::start", + "Starting the Darkfid P2P handler..." + ); + + // Start the `ProtocolProposal` messages handler + let subscriber = subscribers.get("proposals").unwrap().clone(); + self.proposals.start(executor, validator, &self.p2p, subscriber).await?; + + // Start the `ProtocolSync` messages handler + self.sync.start(executor, validator).await?; + + // Start the `ProtocolTx` messages handler + let subscriber = subscribers.get("txs").unwrap().clone(); + self.txs.start(executor, validator, subscriber).await?; + + // Start the P2P instance + self.p2p.clone().start().await?; + + info!( + target: "darkfid::proto::mod::DarkfidP2pHandler::start", + "Darkfid P2P handler started successfully!" + ); + + Ok(()) + } + + /// Stop the Darkfid P2P protocols handler. + pub async fn stop(&self) { + info!(target: "darkfid::proto::mod::DarkfidP2pHandler::stop", "Terminating Darkfid P2P handler..."); + + // Stop the P2P instance + self.p2p.stop().await; + + // Start the `ProtocolTx` messages handler + self.txs.stop().await; + + // Start the `ProtocolSync` messages handler + self.sync.stop().await; + + // Start the `ProtocolProposal` messages handler + self.proposals.stop().await; + + info!(target: "darkfid::proto::mod::DarkfidP2pHandler::stop", "Darkfid P2P handler terminated successfully!"); + } +} diff --git a/bin/darkfid/src/proto/protocol_proposal.rs b/bin/darkfid/src/proto/protocol_proposal.rs index 46680a258..99b5aa195 100644 --- a/bin/darkfid/src/proto/protocol_proposal.rs +++ b/bin/darkfid/src/proto/protocol_proposal.rs @@ -20,16 +20,19 @@ use std::sync::Arc; use async_trait::async_trait; use log::{debug, error, warn}; -use smol::Executor; use tinyjson::JsonValue; use darkfi::{ impl_p2p_message, net::{ - ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, - ProtocolJobsManager, ProtocolJobsManagerPtr, + protocol::protocol_generic::{ + ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr, + }, + session::SESSION_DEFAULT, + Message, P2pPtr, }, rpc::jsonrpc::JsonSubscriber, + system::ExecutorPtr, util::encoding::base64, validator::{consensus::Proposal, ValidatorPtr}, Error, Result, @@ -44,191 +47,217 @@ pub struct ProposalMessage(pub Proposal); impl_p2p_message!(ProposalMessage, "proposal"); -pub struct ProtocolProposal { - proposal_sub: MessageSubscription, - proposals_response_sub: MessageSubscription, - jobsman: ProtocolJobsManagerPtr, - validator: ValidatorPtr, - p2p: P2pPtr, - channel: ChannelPtr, - subscriber: JsonSubscriber, +/// Atomic pointer to the `ProtocolProposal` handler. +pub type ProtocolProposalHandlerPtr = Arc; + +/// Handler managing [`Proposal`] messages, over a generic P2P protocol. +pub struct ProtocolProposalHandler { + /// The generic handler for [`Proposal`] messages. + handler: ProtocolGenericHandlerPtr, } -impl ProtocolProposal { - pub async fn init( - channel: ChannelPtr, - validator: ValidatorPtr, - p2p: P2pPtr, - subscriber: JsonSubscriber, - ) -> Result { +impl ProtocolProposalHandler { + /// Initialize a generic prototocol handler for [`Proposal`] messages + /// and registers it to the provided P2P network, using the default session flag. + pub async fn init(p2p: &P2pPtr) -> ProtocolProposalHandlerPtr { debug!( target: "darkfid::proto::protocol_proposal::init", "Adding ProtocolProposal to the protocol registry" ); - let msg_subsystem = channel.message_subsystem(); - msg_subsystem.add_dispatch::().await; - let proposal_sub = channel.subscribe_msg::().await?; - let proposals_response_sub = channel.subscribe_msg::().await?; + let handler = ProtocolGenericHandler::new(p2p, "ProtocolProposal", SESSION_DEFAULT).await; - Ok(Arc::new(Self { - proposal_sub, - proposals_response_sub, - jobsman: ProtocolJobsManager::new("ProposalProtocol", channel.clone()), - validator, - p2p, - channel, - subscriber, - })) + Arc::new(Self { handler }) } - async fn handle_receive_proposal(self: Arc) -> Result<()> { - debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "START"); - let exclude_list = vec![self.channel.address().clone()]; - loop { - let proposal = match self.proposal_sub.receive().await { - Ok(v) => v, - Err(e) => { - debug!( - target: "darkfid::proto::protocol_proposal::handle_receive_proposal", - "recv fail: {e}" - ); - continue + /// Start the `ProtocolProposal` background task. + pub async fn start( + &self, + executor: &ExecutorPtr, + validator: &ValidatorPtr, + p2p: &P2pPtr, + subscriber: JsonSubscriber, + ) -> Result<()> { + debug!( + target: "darkfid::proto::protocol_proposal::start", + "Starting ProtocolProposal handler task..." + ); + + self.handler.task.clone().start( + handle_receive_proposal(self.handler.clone(), validator.clone(), p2p.clone(), subscriber), + |res| async move { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting ProtocolProposal handler task: {e}"), } - }; + }, + Error::DetachedTaskStopped, + executor.clone(), + ); - // Check if node has finished syncing its blockchain - if !*self.validator.synced.read().await { - debug!( - target: "darkfid::proto::protocol_proposal::handle_receive_proposal", - "Node still syncing blockchain, skipping..." - ); - continue - } + debug!( + target: "darkfid::proto::protocol_proposal::start", + "ProtocolProposal handler task started!" + ); - let proposal_copy = (*proposal).clone(); - - match self.validator.append_proposal(&proposal_copy.0).await { - Ok(()) => { - self.p2p.broadcast_with_exclude(&proposal_copy, &exclude_list).await; - let enc_prop = - JsonValue::String(base64::encode(&serialize_async(&proposal_copy).await)); - self.subscriber.notify(vec![enc_prop].into()).await; - continue - } - Err(e) => { - debug!( - target: "darkfid::proto::protocol_proposal::handle_receive_proposal", - "append_proposal fail: {e}", - ); - - match e { - Error::ExtendedChainIndexNotFound => { /* Do nothing */ } - _ => continue, - } - } - }; - - // If proposal fork chain was not found, we ask our peer for its sequence - debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Asking peer for fork sequence"); - - // Cleanup subscriber - if let Err(e) = self.proposals_response_sub.clean().await { - error!( - target: "darkfid::proto::protocol_proposal::handle_receive_proposal", - "Error during proposals response subscriber cleanup: {e}" - ); - continue - }; - - // Grab last known block to create the request and execute it - let last = match self.validator.blockchain.last() { - Ok(l) => l, - Err(e) => { - debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Blockchain last retriaval failed: {e}"); - continue - } - }; - let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal_copy.0.hash) }; - if let Err(e) = self.channel.send(&request).await { - debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Channel send failed: {e}"); - continue - }; - - // Node waits for response - let response = match self - .proposals_response_sub - .receive_with_timeout(self.p2p.settings().read().await.outbound_connect_timeout) - .await - { - Ok(r) => r, - Err(e) => { - debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Asking peer for fork sequence failed: {e}"); - continue - } - }; - debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Peer response: {response:?}"); - - // Verify and store retrieved proposals - debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Processing received proposals"); - - // Response should not be empty - if response.proposals.is_empty() { - warn!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Peer responded with empty sequence, node might be out of sync!"); - continue - } - - // Sequence length must correspond to requested height - if response.proposals.len() as u32 != proposal_copy.0.block.header.height - last.0 { - debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence length is erroneous"); - continue - } - - // First proposal must extend canonical - if response.proposals[0].block.header.previous != last.1 { - debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence doesn't extend canonical"); - continue - } - - // Last proposal must be the same as the one requested - if response.proposals.last().unwrap().hash != proposal_copy.0.hash { - debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence doesn't correspond to requested tip"); - continue - } - - for proposal in &response.proposals { - match self.validator.append_proposal(proposal).await { - Ok(()) => { /* Do nothing */ } - // Skip already existing proposals - Err(Error::ProposalAlreadyExists) => continue, - Err(e) => { - error!( - target: "darkfid::proto::protocol_proposal::handle_receive_proposal", - "Error while appending response proposal: {e}" - ); - } - }; - let message = ProposalMessage(proposal.clone()); - self.p2p.broadcast_with_exclude(&message, &exclude_list).await; - // Notify subscriber - let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await)); - self.subscriber.notify(vec![enc_prop].into()).await; - } - } - } -} - -#[async_trait] -impl ProtocolBase for ProtocolProposal { - async fn start(self: Arc, executor: Arc>) -> Result<()> { - debug!(target: "darkfid::proto::protocol_proposal::start", "START"); - self.jobsman.clone().start(executor.clone()); - self.jobsman.clone().spawn(self.clone().handle_receive_proposal(), executor.clone()).await; - debug!(target: "darkfid::proto::protocol_proposal::start", "END"); Ok(()) } - fn name(&self) -> &'static str { - "ProtocolProposal" + /// Stop the `ProtocolProposal` background task. + pub async fn stop(&self) { + debug!(target: "darkfid::proto::protocol_proposal::stop", "Terminating ProtocolProposal handler task..."); + self.handler.task.stop().await; + debug!(target: "darkfid::proto::protocol_proposal::stop", "ProtocolProposal handler task terminated!"); + } +} + +/// Background handler function for ProtocolProposal. +async fn handle_receive_proposal( + handler: ProtocolGenericHandlerPtr, + validator: ValidatorPtr, + p2p: P2pPtr, + subscriber: JsonSubscriber, +) -> Result<()> { + debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "START"); + loop { + // Wait for a new proposal message + let (channel, proposal) = match handler.receiver.recv().await { + Ok(r) => r, + Err(e) => { + debug!( + target: "darkfid::proto::protocol_proposal::handle_receive_proposal", + "recv fail: {e}" + ); + continue + } + }; + + // Check if node has finished syncing its blockchain + if !*validator.synced.read().await { + debug!( + target: "darkfid::proto::protocol_proposal::handle_receive_proposal", + "Node still syncing blockchain, skipping..." + ); + handler.send_action(channel, ProtocolGenericAction::Skip).await; + continue + } + + // Append proposal + match validator.append_proposal(&proposal.0).await { + Ok(()) => { + // Signal handler to broadcast the valid proposal to rest nodes + handler.send_action(channel, ProtocolGenericAction::Broadcast).await; + + // Notify subscriber + let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await)); + subscriber.notify(vec![enc_prop].into()).await; + + continue + } + Err(e) => { + debug!( + target: "darkfid::proto::protocol_proposal::handle_receive_proposal", + "append_proposal fail: {e}", + ); + + handler.send_action(channel, ProtocolGenericAction::Skip).await; + + match e { + Error::ExtendedChainIndexNotFound => { /* Do nothing */ } + _ => continue, + } + } + }; + + // If proposal fork chain was not found, we ask our peer for its sequence + debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Asking peer for fork sequence"); + let Some(channel) = p2p.get_channel(channel) else { + error!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Channel {channel} wasn't found."); + continue + }; + + // Communication setup + let Ok(response_sub) = channel.subscribe_msg::().await else { + error!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Failure during `ForkSyncResponse` communication setup with peer: {channel:?}"); + continue + }; + + // Grab last known block to create the request and execute it + let last = match validator.blockchain.last() { + Ok(l) => l, + Err(e) => { + debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Blockchain last retriaval failed: {e}"); + continue + } + }; + let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal.0.hash) }; + if let Err(e) = channel.send(&request).await { + debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Channel send failed: {e}"); + continue + }; + + // Node waits for response + let response = match response_sub + .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout) + .await + { + Ok(r) => r, + Err(e) => { + debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Asking peer for fork sequence failed: {e}"); + continue + } + }; + debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Peer response: {response:?}"); + + // Verify and store retrieved proposals + debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Processing received proposals"); + + // Response should not be empty + if response.proposals.is_empty() { + warn!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Peer responded with empty sequence, node might be out of sync!"); + continue + } + + // Sequence length must correspond to requested height + if response.proposals.len() as u32 != proposal.0.block.header.height - last.0 { + debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence length is erroneous"); + continue + } + + // First proposal must extend canonical + if response.proposals[0].block.header.previous != last.1 { + debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence doesn't extend canonical"); + continue + } + + // Last proposal must be the same as the one requested + if response.proposals.last().unwrap().hash != proposal.0.hash { + debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence doesn't correspond to requested tip"); + continue + } + + // Process response proposals + for proposal in &response.proposals { + // Append proposal + match validator.append_proposal(proposal).await { + Ok(()) => { /* Do nothing */ } + // Skip already existing proposals + Err(Error::ProposalAlreadyExists) => continue, + Err(e) => { + error!( + target: "darkfid::proto::protocol_proposal::handle_receive_proposal", + "Error while appending response proposal: {e}" + ); + } + }; + + // Broadcast proposal to rest nodes + let message = ProposalMessage(proposal.clone()); + p2p.broadcast_with_exclude(&message, &[channel.address().clone()]).await; + + // Notify subscriber + let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await)); + subscriber.notify(vec![enc_prop].into()).await; + } } } diff --git a/bin/darkfid/src/proto/protocol_sync.rs b/bin/darkfid/src/proto/protocol_sync.rs index 0f13bdf99..5f4db2695 100644 --- a/bin/darkfid/src/proto/protocol_sync.rs +++ b/bin/darkfid/src/proto/protocol_sync.rs @@ -20,17 +20,20 @@ use std::sync::Arc; use async_trait::async_trait; use log::{debug, error}; -use smol::Executor; use darkfi::{ blockchain::{BlockInfo, Header, HeaderHash}, impl_p2p_message, net::{ - ChannelPtr, Message, MessageSubscription, ProtocolBase, ProtocolBasePtr, - ProtocolJobsManager, ProtocolJobsManagerPtr, + protocol::protocol_generic::{ + ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr, + }, + session::SESSION_DEFAULT, + Message, P2pPtr, }, + system::ExecutorPtr, validator::{consensus::Proposal, ValidatorPtr}, - Result, + Error, Result, }; use darkfi_serial::{SerialDecodable, SerialEncodable}; @@ -40,7 +43,7 @@ pub const BATCH: usize = 20; /// Structure represening a request to ask a node for their current /// canonical(finalized) tip block hash, if they are synced. We also /// include our own tip, so they can verify we follow the same sequence. -#[derive(Debug, SerialEncodable, SerialDecodable)] +#[derive(Clone, Debug, SerialEncodable, SerialDecodable)] pub struct TipRequest { /// Canonical(finalized) tip block hash pub tip: HeaderHash, @@ -51,7 +54,7 @@ impl_p2p_message!(TipRequest, "tiprequest"); /// Structure representing the response to `TipRequest`, /// containing a boolean flag to indicate if we are synced, /// and our canonical(finalized) tip block height and hash. -#[derive(Debug, SerialEncodable, SerialDecodable)] +#[derive(Clone, Debug, SerialEncodable, SerialDecodable)] pub struct TipResponse { /// Flag indicating the node is synced pub synced: bool, @@ -65,7 +68,7 @@ impl_p2p_message!(TipResponse, "tipresponse"); /// Structure represening a request to ask a node for up to `BATCH` headers before /// the provided header height. -#[derive(Debug, SerialEncodable, SerialDecodable)] +#[derive(Clone, Debug, SerialEncodable, SerialDecodable)] pub struct HeaderSyncRequest { /// Header height pub height: u32, @@ -75,7 +78,7 @@ impl_p2p_message!(HeaderSyncRequest, "headersyncrequest"); /// Structure representing the response to `HeaderSyncRequest`, /// containing up to `BATCH` headers before the requested block height. -#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +#[derive(Clone, Debug, SerialEncodable, SerialDecodable)] pub struct HeaderSyncResponse { /// Response headers pub headers: Vec
, @@ -85,7 +88,7 @@ impl_p2p_message!(HeaderSyncResponse, "headersyncresponse"); /// Structure represening a request to ask a node for up to`BATCH` blocks /// of provided headers. -#[derive(Debug, SerialEncodable, SerialDecodable)] +#[derive(Clone, Debug, SerialEncodable, SerialDecodable)] pub struct SyncRequest { /// Header hashes pub headers: Vec, @@ -95,7 +98,7 @@ impl_p2p_message!(SyncRequest, "syncrequest"); /// Structure representing the response to `SyncRequest`, /// containing up to `BATCH` blocks after the requested block height. -#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +#[derive(Clone, Debug, SerialEncodable, SerialDecodable)] pub struct SyncResponse { /// Response blocks pub blocks: Vec, @@ -108,7 +111,7 @@ impl_p2p_message!(SyncResponse, "syncresponse"); /// otherwise they respond with their best fork sequence. /// We also include our own canonical(finalized) tip, so they can verify /// we follow the same sequence. -#[derive(Debug, SerialEncodable, SerialDecodable)] +#[derive(Clone, Debug, SerialEncodable, SerialDecodable)] pub struct ForkSyncRequest { /// Canonical(finalized) tip block hash pub tip: HeaderHash, @@ -120,7 +123,7 @@ impl_p2p_message!(ForkSyncRequest, "forksyncrequest"); /// Structure representing the response to `ForkSyncRequest`, /// containing the requested fork sequence. -#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +#[derive(Clone, Debug, SerialEncodable, SerialDecodable)] pub struct ForkSyncResponse { /// Response fork proposals pub proposals: Vec, @@ -128,308 +131,346 @@ pub struct ForkSyncResponse { impl_p2p_message!(ForkSyncResponse, "forksyncresponse"); -pub struct ProtocolSync { - tip_sub: MessageSubscription, - header_sub: MessageSubscription, - request_sub: MessageSubscription, - fork_request_sub: MessageSubscription, - jobsman: ProtocolJobsManagerPtr, - validator: ValidatorPtr, - channel: ChannelPtr, +/// Atomic pointer to the `ProtocolSync` handler. +pub type ProtocolSyncHandlerPtr = Arc; + +/// Handler managing all `ProtocolSync` messages, over generic P2P protocols. +pub struct ProtocolSyncHandler { + /// The generic handler for `TipRequest` messages. + tip_handler: ProtocolGenericHandlerPtr, + /// The generic handler for `HeaderSyncRequest` messages. + header_handler: ProtocolGenericHandlerPtr, + /// The generic handler for `SyncRequest` messages. + sync_handler: ProtocolGenericHandlerPtr, + /// The generic handler for `ForkSyncRequest` messages. + fork_sync_handler: ProtocolGenericHandlerPtr, } -impl ProtocolSync { - pub async fn init(channel: ChannelPtr, validator: ValidatorPtr) -> Result { +impl ProtocolSyncHandler { + /// Initialize the generic prototocol handlers for all `ProtocolSync` messages + /// and register them to the provided P2P network, using the default session flag. + pub async fn init(p2p: &P2pPtr) -> ProtocolSyncHandlerPtr { debug!( target: "darkfid::proto::protocol_sync::init", - "Adding ProtocolSync to the protocol registry" + "Adding all sync protocols to the protocol registry" ); - let msg_subsystem = channel.message_subsystem(); - msg_subsystem.add_dispatch::().await; - msg_subsystem.add_dispatch::().await; - msg_subsystem.add_dispatch::().await; - msg_subsystem.add_dispatch::().await; - msg_subsystem.add_dispatch::().await; - msg_subsystem.add_dispatch::().await; - msg_subsystem.add_dispatch::().await; - msg_subsystem.add_dispatch::().await; - let tip_sub = channel.subscribe_msg::().await?; - let header_sub = channel.subscribe_msg::().await?; - let request_sub = channel.subscribe_msg::().await?; - let fork_request_sub = channel.subscribe_msg::().await?; + let tip_handler = + ProtocolGenericHandler::new(p2p, "ProtocolSyncTip", SESSION_DEFAULT).await; + let header_handler = + ProtocolGenericHandler::new(p2p, "ProtocolSyncHeader", SESSION_DEFAULT).await; + let sync_handler = ProtocolGenericHandler::new(p2p, "ProtocolSync", SESSION_DEFAULT).await; + let fork_sync_handler = + ProtocolGenericHandler::new(p2p, "ProtocolSyncFork", SESSION_DEFAULT).await; - Ok(Arc::new(Self { - tip_sub, - header_sub, - request_sub, - fork_request_sub, - jobsman: ProtocolJobsManager::new("SyncProtocol", channel.clone()), - validator, - channel, - })) + Arc::new(Self { tip_handler, header_handler, sync_handler, fork_sync_handler }) } - async fn handle_receive_tip_request(self: Arc) -> Result<()> { - debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "START"); - loop { - let request = match self.tip_sub.receive().await { - Ok(v) => v, - Err(e) => { - debug!( - target: "darkfid::proto::protocol_sync::handle_receive_tip_request", - "recv fail: {e}" - ); - continue + /// Start all `ProtocolSync` background tasks. + pub async fn start(&self, executor: &ExecutorPtr, validator: &ValidatorPtr) -> Result<()> { + debug!( + target: "darkfid::proto::protocol_sync::start", + "Starting sync protocols handlers tasks..." + ); + + self.tip_handler.task.clone().start( + handle_receive_tip_request(self.tip_handler.clone(), validator.clone()), + |res| async move { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncTip handler task: {e}"), } - }; + }, + Error::DetachedTaskStopped, + executor.clone(), + ); - // Check if node has finished syncing its blockchain - let response = if !*self.validator.synced.read().await { - debug!( - target: "darkfid::proto::protocol_sync::handle_receive_tip_request", - "Node still syncing blockchain, skipping..." - ); - TipResponse { synced: false, height: None, hash: None } - } else { - // Check we follow the same sequence - match self.validator.blockchain.blocks.contains(&request.tip) { - Ok(contains) => { - if !contains { - debug!( - target: "darkfid::proto::protocol_sync::handle_receive_tip_request", - "Node doesn't follow request sequence" - ); - continue - } - } - Err(e) => { - error!( - target: "darkfid::proto::protocol_sync::handle_receive_tip_request", - "block_store.contains fail: {e}" - ); - continue - } + self.header_handler.task.clone().start( + handle_receive_header_request(self.header_handler.clone(), validator.clone()), + |res| async move { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncHeader handler task: {e}"), } + }, + Error::DetachedTaskStopped, + executor.clone(), + ); - // Grab our current tip and return it - let tip = match self.validator.blockchain.last() { - Ok(v) => v, - Err(e) => { - error!( - target: "darkfid::proto::protocol_sync::handle_receive_tip_request", - "blockchain.last fail: {e}" - ); - continue - } - }; - - TipResponse { synced: true, height: Some(tip.0), hash: Some(tip.1) } - }; - - if let Err(e) = self.channel.send(&response).await { - error!( - target: "darkfid::proto::protocol_sync::handle_receive_tip_request", - "Channel send fail: {e}" - ) - }; - } - } - - async fn handle_receive_header_request(self: Arc) -> Result<()> { - debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "START"); - loop { - let request = match self.header_sub.receive().await { - Ok(v) => v, - Err(e) => { - debug!( - target: "darkfid::proto::protocol_sync::handle_receive_header_request", - "recv fail: {}", - e - ); - continue + self.sync_handler.task.clone().start( + handle_receive_request(self.sync_handler.clone(), validator.clone()), + |res| async move { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSync handler task: {e}"), } - }; + }, + Error::DetachedTaskStopped, + executor.clone(), + ); - // Check if node has finished syncing its blockchain - if !*self.validator.synced.read().await { - debug!( - target: "darkfid::proto::protocol_sync::handle_receive_header_request", - "Node still syncing blockchain, skipping..." - ); - continue - } - - let headers = match self.validator.blockchain.get_headers_before(request.height, BATCH) - { - Ok(v) => v, - Err(e) => { - error!( - target: "darkfid::proto::protocol_sync::handle_receive_header_request", - "get_headers_before fail: {}", - e - ); - continue + self.fork_sync_handler.task.clone().start( + handle_receive_fork_request(self.fork_sync_handler.clone(), validator.clone()), + |res| async move { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncFork handler task: {e}"), } - }; + }, + Error::DetachedTaskStopped, + executor.clone(), + ); - let response = HeaderSyncResponse { headers }; - if let Err(e) = self.channel.send(&response).await { - error!( - target: "darkfid::proto::protocol_sync::handle_receive_header_request", - "channel send fail: {}", - e - ) - }; - } - } + debug!( + target: "darkfid::proto::protocol_sync::start", + "Sync protocols handlers tasks started!" + ); - async fn handle_receive_request(self: Arc) -> Result<()> { - debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "START"); - loop { - let request = match self.request_sub.receive().await { - Ok(v) => v, - Err(e) => { - debug!( - target: "darkfid::proto::protocol_sync::handle_receive_request", - "recv fail: {}", - e - ); - continue - } - }; - - // Check if node has finished syncing its blockchain - if !*self.validator.synced.read().await { - debug!( - target: "darkfid::proto::protocol_sync::handle_receive_request", - "Node still syncing blockchain, skipping..." - ); - continue - } - - // Check if request exists the configured limit - if request.headers.len() > BATCH { - debug!( - target: "darkfid::proto::protocol_sync::handle_receive_request", - "Node requested more blocks than allowed." - ); - continue - } - - let blocks = match self.validator.blockchain.get_blocks_by_hash(&request.headers) { - Ok(v) => v, - Err(e) => { - error!( - target: "darkfid::proto::protocol_sync::handle_receive_request", - "get_blocks_after fail: {}", - e - ); - continue - } - }; - - let response = SyncResponse { blocks }; - if let Err(e) = self.channel.send(&response).await { - error!( - target: "darkfid::proto::protocol_sync::handle_receive_request", - "channel send fail: {}", - e - ) - }; - } - } - - async fn handle_receive_fork_request(self: Arc) -> Result<()> { - debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "START"); - loop { - let request = match self.fork_request_sub.receive().await { - Ok(v) => v, - Err(e) => { - debug!( - target: "darkfid::proto::protocol_sync::handle_receive_fork_request", - "recv fail: {}", - e - ); - continue - } - }; - - // Check if node has finished syncing its blockchain - if !*self.validator.synced.read().await { - debug!( - target: "darkfid::proto::protocol_sync::handle_receive_fork_request", - "Node still syncing blockchain, skipping..." - ); - continue - } - - debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "Received request: {request:?}"); - - // If a fork tip is provided, grab its fork proposals sequence. - // Otherwise, grab best fork proposals sequence. - let proposals = match request.fork_tip { - Some(fork_tip) => { - self.validator - .consensus - .get_fork_proposals(request.tip, fork_tip, BATCH as u32) - .await - } - None => { - self.validator - .consensus - .get_best_fork_proposals(request.tip, BATCH as u32) - .await - } - }; - let proposals = match proposals { - Ok(p) => p, - Err(e) => { - debug!( - target: "darkfid::proto::protocol_sync::handle_receive_request", - "Getting fork proposals failed: {}", - e - ); - continue - } - }; - - let response = ForkSyncResponse { proposals }; - debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "Response: {response:?}"); - if let Err(e) = self.channel.send(&response).await { - debug!( - target: "darkfid::proto::protocol_sync::handle_receive_fork_request", - "channel send fail: {}", - e - ) - }; - } - } -} - -#[async_trait] -impl ProtocolBase for ProtocolSync { - async fn start(self: Arc, executor: Arc>) -> Result<()> { - debug!(target: "darkfid::proto::protocol_sync::start", "START"); - self.jobsman.clone().start(executor.clone()); - self.jobsman - .clone() - .spawn(self.clone().handle_receive_tip_request(), executor.clone()) - .await; - self.jobsman - .clone() - .spawn(self.clone().handle_receive_header_request(), executor.clone()) - .await; - self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await; - self.jobsman - .clone() - .spawn(self.clone().handle_receive_fork_request(), executor.clone()) - .await; - debug!(target: "darkfid::proto::protocol_sync::start", "END"); Ok(()) } - fn name(&self) -> &'static str { - "ProtocolSync" + /// Stop all `ProtocolSync` background tasks. + pub async fn stop(&self) { + debug!(target: "darkfid::proto::protocol_sync::stop", "Terminating sync protocols handlers tasks..."); + self.tip_handler.task.stop().await; + self.header_handler.task.stop().await; + self.sync_handler.task.stop().await; + self.fork_sync_handler.task.stop().await; + debug!(target: "darkfid::proto::protocol_sync::stop", "Sync protocols handlers tasks terminated!"); + } +} + +/// Background handler function for ProtocolSyncTip. +async fn handle_receive_tip_request( + handler: ProtocolGenericHandlerPtr, + validator: ValidatorPtr, +) -> Result<()> { + debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "START"); + loop { + // Wait for a new tip request message + let (channel, request) = match handler.receiver.recv().await { + Ok(r) => r, + Err(e) => { + debug!( + target: "darkfid::proto::protocol_sync::handle_receive_tip_request", + "recv fail: {e}" + ); + continue + } + }; + + // Check if node has finished syncing its blockchain + let response = if !*validator.synced.read().await { + TipResponse { synced: false, height: None, hash: None } + } else { + // Check we follow the same sequence + match validator.blockchain.blocks.contains(&request.tip) { + Ok(contains) => { + if !contains { + debug!( + target: "darkfid::proto::protocol_sync::handle_receive_tip_request", + "Node doesn't follow request sequence" + ); + handler.send_action(channel, ProtocolGenericAction::Skip).await; + continue + } + } + Err(e) => { + error!( + target: "darkfid::proto::protocol_sync::handle_receive_tip_request", + "block_store.contains fail: {e}" + ); + handler.send_action(channel, ProtocolGenericAction::Skip).await; + continue + } + } + + // Grab our current tip and return it + let tip = match validator.blockchain.last() { + Ok(v) => v, + Err(e) => { + error!( + target: "darkfid::proto::protocol_sync::handle_receive_tip_request", + "blockchain.last fail: {e}" + ); + handler.send_action(channel, ProtocolGenericAction::Skip).await; + continue + } + }; + + TipResponse { synced: true, height: Some(tip.0), hash: Some(tip.1) } + }; + + // Send response + handler.send_action(channel, ProtocolGenericAction::Response(response)).await; + } +} + +/// Background handler function for ProtocolSyncHeader. +async fn handle_receive_header_request( + handler: ProtocolGenericHandlerPtr, + validator: ValidatorPtr, +) -> Result<()> { + debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "START"); + loop { + // Wait for a new header request message + let (channel, request) = match handler.receiver.recv().await { + Ok(r) => r, + Err(e) => { + debug!( + target: "darkfid::proto::protocol_sync::handle_receive_header_request", + "recv fail: {e}" + ); + continue + } + }; + + // Check if node has finished syncing its blockchain + if !*validator.synced.read().await { + debug!( + target: "darkfid::proto::protocol_sync::handle_receive_header_request", + "Node still syncing blockchain, skipping..." + ); + handler.send_action(channel, ProtocolGenericAction::Skip).await; + continue + } + + // Grab the corresponding headers + let headers = match validator.blockchain.get_headers_before(request.height, BATCH) { + Ok(v) => v, + Err(e) => { + error!( + target: "darkfid::proto::protocol_sync::handle_receive_header_request", + "get_headers_before fail: {}", + e + ); + handler.send_action(channel, ProtocolGenericAction::Skip).await; + continue + } + }; + + // Send response + handler + .send_action(channel, ProtocolGenericAction::Response(HeaderSyncResponse { headers })) + .await; + } +} + +/// Background handler function for ProtocolSync. +async fn handle_receive_request( + handler: ProtocolGenericHandlerPtr, + validator: ValidatorPtr, +) -> Result<()> { + debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "START"); + loop { + // Wait for a new sync request message + let (channel, request) = match handler.receiver.recv().await { + Ok(r) => r, + Err(e) => { + debug!( + target: "darkfid::proto::protocol_sync::handle_receive_request", + "recv fail: {e}" + ); + continue + } + }; + + // Check if node has finished syncing its blockchain + if !*validator.synced.read().await { + debug!( + target: "darkfid::proto::protocol_sync::handle_receive_request", + "Node still syncing blockchain, skipping..." + ); + handler.send_action(channel, ProtocolGenericAction::Skip).await; + continue + } + + // Check if request exists the configured limit + if request.headers.len() > BATCH { + debug!( + target: "darkfid::proto::protocol_sync::handle_receive_request", + "Node requested more blocks than allowed." + ); + handler.send_action(channel, ProtocolGenericAction::Skip).await; + continue + } + + // Grab the corresponding blocks + let blocks = match validator.blockchain.get_blocks_by_hash(&request.headers) { + Ok(v) => v, + Err(e) => { + error!( + target: "darkfid::proto::protocol_sync::handle_receive_request", + "get_blocks_after fail: {}", + e + ); + handler.send_action(channel, ProtocolGenericAction::Skip).await; + continue + } + }; + + // Send response + handler + .send_action(channel, ProtocolGenericAction::Response(SyncResponse { blocks })) + .await; + } +} + +/// Background handler function for ProtocolSyncFork. +async fn handle_receive_fork_request( + handler: ProtocolGenericHandlerPtr, + validator: ValidatorPtr, +) -> Result<()> { + debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "START"); + loop { + // Wait for a new fork sync request message + let (channel, request) = match handler.receiver.recv().await { + Ok(r) => r, + Err(e) => { + debug!( + target: "darkfid::proto::protocol_sync::handle_receive_fork_request", + "recv fail: {e}" + ); + continue + } + }; + + // Check if node has finished syncing its blockchain + if !*validator.synced.read().await { + debug!( + target: "darkfid::proto::protocol_sync::handle_receive_fork_request", + "Node still syncing blockchain, skipping..." + ); + handler.send_action(channel, ProtocolGenericAction::Skip).await; + continue + } + + debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "Received request: {request:?}"); + + // If a fork tip is provided, grab its fork proposals sequence. + // Otherwise, grab best fork proposals sequence. + let proposals = match request.fork_tip { + Some(fork_tip) => { + validator.consensus.get_fork_proposals(request.tip, fork_tip, BATCH as u32).await + } + None => validator.consensus.get_best_fork_proposals(request.tip, BATCH as u32).await, + }; + let proposals = match proposals { + Ok(p) => p, + Err(e) => { + debug!( + target: "darkfid::proto::protocol_sync::handle_receive_fork_request", + "Getting fork proposals failed: {}", + e + ); + handler.send_action(channel, ProtocolGenericAction::Skip).await; + continue + } + }; + + // Send response + handler + .send_action(channel, ProtocolGenericAction::Response(ForkSyncResponse { proposals })) + .await; } } diff --git a/bin/darkfid/src/proto/protocol_tx.rs b/bin/darkfid/src/proto/protocol_tx.rs index 1aa8af934..7aa1251d9 100644 --- a/bin/darkfid/src/proto/protocol_tx.rs +++ b/bin/darkfid/src/proto/protocol_tx.rs @@ -18,119 +18,134 @@ use std::sync::Arc; -use async_trait::async_trait; -use log::debug; -use smol::Executor; +use log::{debug, error}; use tinyjson::JsonValue; -use url::Url; use darkfi::{ net::{ - ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, - ProtocolJobsManager, ProtocolJobsManagerPtr, + protocol::protocol_generic::{ + ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr, + }, + session::SESSION_DEFAULT, + P2pPtr, }, rpc::jsonrpc::JsonSubscriber, + system::ExecutorPtr, tx::Transaction, util::encoding::base64, validator::ValidatorPtr, - Result, + Error, Result, }; use darkfi_serial::serialize_async; -pub struct ProtocolTx { - tx_sub: MessageSubscription, - jobsman: ProtocolJobsManagerPtr, - validator: ValidatorPtr, - p2p: P2pPtr, - channel_address: Url, - subscriber: JsonSubscriber, +/// Atomic pointer to the `ProtocolTx` handler. +pub type ProtocolTxHandlerPtr = Arc; + +/// Handler managing [`Transaction`] messages, over a generic P2P protocol. +pub struct ProtocolTxHandler { + /// The generic handler for [`Transaction`] messages. + handler: ProtocolGenericHandlerPtr, } -impl ProtocolTx { - pub async fn init( - channel: ChannelPtr, - validator: ValidatorPtr, - p2p: P2pPtr, - subscriber: JsonSubscriber, - ) -> Result { +impl ProtocolTxHandler { + /// Initialize a generic prototocol handler for [`Transaction`] messages + /// and registers it to the provided P2P network, using the default session flag. + pub async fn init(p2p: &P2pPtr) -> ProtocolTxHandlerPtr { debug!( target: "darkfid::proto::protocol_tx::init", "Adding ProtocolTx to the protocol registry" ); - let msg_subsystem = channel.message_subsystem(); - msg_subsystem.add_dispatch::().await; - let tx_sub = channel.subscribe_msg::().await?; + let handler = ProtocolGenericHandler::new(p2p, "ProtocolTx", SESSION_DEFAULT).await; - Ok(Arc::new(Self { - tx_sub, - jobsman: ProtocolJobsManager::new("TxProtocol", channel.clone()), - validator, - p2p, - channel_address: channel.address().clone(), - subscriber, - })) + Arc::new(Self { handler }) } - async fn handle_receive_tx(self: Arc) -> Result<()> { + /// Start the `ProtocolTx` background task. + pub async fn start( + &self, + executor: &ExecutorPtr, + validator: &ValidatorPtr, + subscriber: JsonSubscriber, + ) -> Result<()> { debug!( - target: "darkfid::proto::protocol_tx::handle_receive_tx", - "START" + target: "darkfid::proto::protocol_tx::start", + "Starting ProtocolTx handler task..." ); - let exclude_list = vec![self.channel_address.clone()]; - loop { - let tx = match self.tx_sub.receive().await { - Ok(v) => v, - Err(e) => { - debug!( - target: "darkfid::proto::protocol_tx::handle_receive_tx", - "recv fail: {e}" - ); - continue + + self.handler.task.clone().start( + handle_receive_tx(self.handler.clone(), validator.clone(), subscriber), + |res| async move { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => error!(target: "darkfid::proto::protocol_tx::start", "Failed starting ProtocolTx handler task: {e}"), } - }; + }, + Error::DetachedTaskStopped, + executor.clone(), + ); - // Check if node has finished syncing its blockchain - if !*self.validator.synced.read().await { - debug!( - target: "darkfid::proto::protocol_tx::handle_receive_tx", - "Node still syncing blockchain, skipping..." - ); - continue - } + debug!( + target: "darkfid::proto::protocol_tx::start", + "ProtocolTx handler task started!" + ); - let tx_copy = (*tx).clone(); - - // Nodes use unconfirmed_txs vector as seen_txs pool. - match self.validator.append_tx(&tx_copy, true).await { - Ok(()) => { - self.p2p.broadcast_with_exclude(&tx_copy, &exclude_list).await; - let encoded_tx = - JsonValue::String(base64::encode(&serialize_async(&tx_copy).await)); - self.subscriber.notify(vec![encoded_tx].into()).await; - } - Err(e) => { - debug!( - target: "darkfid::proto::protocol_tx::handle_receive_tx", - "append_tx fail: {e}" - ); - } - } - } - } -} - -#[async_trait] -impl ProtocolBase for ProtocolTx { - async fn start(self: Arc, executor: Arc>) -> Result<()> { - debug!(target: "darkfid::proto::protocol_tx::start", "START"); - self.jobsman.clone().start(executor.clone()); - self.jobsman.clone().spawn(self.clone().handle_receive_tx(), executor.clone()).await; - debug!(target: "darkfid::proto::protocol_tx::start", "END"); Ok(()) } - fn name(&self) -> &'static str { - "ProtocolTx" + /// Stop the `ProtocolTx` background task. + pub async fn stop(&self) { + debug!(target: "darkfid::proto::protocol_tx::stop", "Terminating ProtocolTx handler task..."); + self.handler.task.stop().await; + debug!(target: "darkfid::proto::protocol_tx::stop", "ProtocolTx handler task terminated!"); + } +} + +/// Background handler function for ProtocolTx. +async fn handle_receive_tx( + handler: ProtocolGenericHandlerPtr, + validator: ValidatorPtr, + subscriber: JsonSubscriber, +) -> Result<()> { + debug!(target: "darkfid::proto::protocol_tx::handle_receive_tx", "START"); + loop { + // Wait for a new transaction message + let (channel, tx) = match handler.receiver.recv().await { + Ok(r) => r, + Err(e) => { + debug!( + target: "darkfid::proto::protocol_tx::handle_receive_tx", + "recv fail: {e}" + ); + continue + } + }; + + // Check if node has finished syncing its blockchain + if !*validator.synced.read().await { + debug!( + target: "darkfid::proto::protocol_tx::handle_receive_tx", + "Node still syncing blockchain, skipping..." + ); + handler.send_action(channel, ProtocolGenericAction::Skip).await; + continue + } + + // Append transaction + if let Err(e) = validator.append_tx(&tx, true).await { + debug!( + target: "darkfid::proto::protocol_tx::handle_receive_tx", + "append_tx fail: {e}" + ); + handler.send_action(channel, ProtocolGenericAction::Skip).await; + continue + } + + // Signal handler to broadcast the valid transaction to rest nodes + handler.send_action(channel, ProtocolGenericAction::Broadcast).await; + + // Notify subscriber + let encoded_tx = JsonValue::String(base64::encode(&serialize_async(&tx).await)); + subscriber.notify(vec![encoded_tx].into()).await; } } diff --git a/bin/darkfid/src/rpc.rs b/bin/darkfid/src/rpc.rs index 485d632d6..bde588cd4 100644 --- a/bin/darkfid/src/rpc.rs +++ b/bin/darkfid/src/rpc.rs @@ -121,9 +121,9 @@ impl Darkfid { let switch = params[0].get::().unwrap(); if *switch { - self.p2p.dnet_enable(); + self.p2p_handler.p2p.dnet_enable(); } else { - self.p2p.dnet_disable(); + self.p2p_handler.p2p.dnet_disable(); } JsonResponse::new(JsonValue::Boolean(true), id).into() @@ -224,6 +224,6 @@ impl Darkfid { impl HandlerP2p for Darkfid { fn p2p(&self) -> P2pPtr { - self.p2p.clone() + self.p2p_handler.p2p.clone() } } diff --git a/bin/darkfid/src/rpc_tx.rs b/bin/darkfid/src/rpc_tx.rs index 2fde0f6a2..71d1dac1f 100644 --- a/bin/darkfid/src/rpc_tx.rs +++ b/bin/darkfid/src/rpc_tx.rs @@ -133,8 +133,8 @@ impl Darkfid { return server_error(RpcError::TxSimulationFail, id, None) }; - self.p2p.broadcast(&tx).await; - if !self.p2p.is_connected() { + self.p2p_handler.p2p.broadcast(&tx).await; + if !self.p2p_handler.p2p.is_connected() { warn!(target: "darkfid::rpc::tx_broadcast", "No connected channels to broadcast tx"); } diff --git a/bin/darkfid/src/task/consensus.rs b/bin/darkfid/src/task/consensus.rs index a6d1c893b..ea8e546c4 100644 --- a/bin/darkfid/src/task/consensus.rs +++ b/bin/darkfid/src/task/consensus.rs @@ -166,7 +166,7 @@ async fn replicator_task(node: Arc, ex: Arc>) - let prop_subscription = proposals_sub.publisher.clone().subscribe().await; // Subscribe to the network disconnect subscriber - let net_subscription = node.p2p.hosts().subscribe_disconnect().await; + let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await; let result = smol::future::or( monitor_network(&net_subscription), diff --git a/bin/darkfid/src/task/miner.rs b/bin/darkfid/src/task/miner.rs index 3d5f6f391..51fab1454 100644 --- a/bin/darkfid/src/task/miner.rs +++ b/bin/darkfid/src/task/miner.rs @@ -338,7 +338,7 @@ async fn mine_next_block( extended_fork.module.verify_current_block(&next_block)?; // Check if we are connected to the network - if !skip_sync && !node.p2p.is_connected() { + if !skip_sync && !node.p2p_handler.p2p.is_connected() { return Err(Error::NetworkNotConnected) } @@ -348,7 +348,7 @@ async fn mine_next_block( // Broadcast proposal to the network let message = ProposalMessage(proposal); - node.p2p.broadcast(&message).await; + node.p2p_handler.p2p.broadcast(&message).await; Ok(()) } diff --git a/bin/darkfid/src/task/sync.rs b/bin/darkfid/src/task/sync.rs index 4748cd4ac..1cf7c6a18 100644 --- a/bin/darkfid/src/task/sync.rs +++ b/bin/darkfid/src/task/sync.rs @@ -138,11 +138,11 @@ async fn synced_peers( checkpoint: Option<(u32, HeaderHash)>, ) -> HashMap<(u32, [u8; 32]), Vec> { info!(target: "darkfid::task::sync::synced_peers", "Receiving tip from peers..."); - let comms_timeout = node.p2p.settings().read().await.outbound_connect_timeout; + let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout; let mut tips = HashMap::new(); loop { // Grab channels - let peers = node.p2p.hosts().channels(); + let peers = node.p2p_handler.p2p.hosts().channels(); // Ask each peer(if we got any) if they are synced for peer in peers { @@ -210,7 +210,7 @@ async fn synced_peers( } warn!(target: "darkfid::task::sync::synced_peers", "Node is not connected to other synced nodes, waiting to retry..."); - let subscription = node.p2p.hosts().subscribe_channel().await; + let subscription = node.p2p_handler.p2p.hosts().subscribe_channel().await; let _ = subscription.receive().await; subscription.unsubscribe().await; @@ -270,7 +270,7 @@ async fn retrieve_headers( } } } - let comms_timeout = node.p2p.settings().read().await.outbound_connect_timeout; + let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout; // We subtract 1 since tip_height is increased by one let total = tip_height - last_known - 1; @@ -390,7 +390,7 @@ async fn retrieve_blocks( } } } - let comms_timeout = node.p2p.settings().read().await.outbound_connect_timeout; + let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout; let mut received_blocks = 0; let total = node.validator.blockchain.headers.len_sync(); @@ -505,7 +505,7 @@ async fn sync_best_fork(node: &Darkfid, peers: &[ChannelPtr], last_tip: &HeaderH // Node waits for response let Ok(response) = response_sub - .receive_with_timeout(node.p2p.settings().read().await.outbound_connect_timeout) + .receive_with_timeout(node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout) .await else { debug!(target: "darkfid::task::sync::sync_best_fork", "Timeout while waiting for `ForkSyncResponse` from peer: {peer:?}"); diff --git a/bin/darkfid/src/tests/harness.rs b/bin/darkfid/src/tests/harness.rs index e55f24a5d..4d713eac4 100644 --- a/bin/darkfid/src/tests/harness.rs +++ b/bin/darkfid/src/tests/harness.rs @@ -41,7 +41,11 @@ use num_bigint::BigUint; use sled_overlay::sled; use url::Url; -use crate::{proto::ProposalMessage, task::sync::sync_task, utils::spawn_p2p, Darkfid}; +use crate::{ + proto::{DarkfidP2pHandler, ProposalMessage}, + task::sync::sync_task, + Darkfid, +}; pub struct HarnessConfig { pub pow_target: u32, @@ -150,7 +154,7 @@ impl Harness { let proposal = Proposal::new(block.clone()); self.alice.validator.append_proposal(&proposal).await?; let message = ProposalMessage(proposal); - self.alice.p2p.broadcast(&message).await; + self.alice.p2p_handler.p2p.broadcast(&message).await; } // Sleep a bit so blocks can be propagated and then @@ -242,10 +246,19 @@ pub async fn generate_node( // We initialize a dnet subscriber but do not activate it. let dnet_sub = JsonSubscriber::new("dnet.subscribe_events"); - let p2p = spawn_p2p(settings, &validator, &subscribers, ex.clone()).await?; - let node = Darkfid::new(p2p.clone(), validator, miner, 50, subscribers, None, dnet_sub).await; + let p2p_handler = DarkfidP2pHandler::init(settings, ex).await?; + let node = Darkfid::new( + p2p_handler.clone(), + validator.clone(), + miner, + 50, + subscribers.clone(), + None, + dnet_sub, + ) + .await; - p2p.start().await?; + p2p_handler.clone().start(ex, &validator, &subscribers).await?; node.validator.consensus.generate_empty_fork().await?; diff --git a/bin/darkfid/src/tests/mod.rs b/bin/darkfid/src/tests/mod.rs index bdf2ad604..c5431e863 100644 --- a/bin/darkfid/src/tests/mod.rs +++ b/bin/darkfid/src/tests/mod.rs @@ -90,7 +90,7 @@ async fn sync_blocks_real(ex: Arc>) -> Result<()> { let charlie_url = Url::parse("tcp+tls://127.0.0.1:18342")?; settings.inbound_addrs = vec![charlie_url]; - let bob_url = th.bob.p2p.settings().read().await.inbound_addrs[0].clone(); + let bob_url = th.bob.p2p_handler.p2p.settings().read().await.inbound_addrs[0].clone(); settings.peers = vec![bob_url]; let charlie = generate_node( &th.vks, diff --git a/bin/darkfid/src/tests/sync_forks.rs b/bin/darkfid/src/tests/sync_forks.rs index 55f83047e..cb2b05ad5 100644 --- a/bin/darkfid/src/tests/sync_forks.rs +++ b/bin/darkfid/src/tests/sync_forks.rs @@ -67,7 +67,7 @@ async fn sync_forks_real(ex: Arc>) -> Result<()> { let charlie_url = Url::parse("tcp+tls://127.0.0.1:18442")?; settings.inbound_addrs = vec![charlie_url]; - let bob_url = th.bob.p2p.settings().read().await.inbound_addrs[0].clone(); + let bob_url = th.bob.p2p_handler.p2p.settings().read().await.inbound_addrs[0].clone(); settings.peers = vec![bob_url]; let charlie = generate_node(&th.vks, &th.validator_config, &settings, &ex, false, false, None).await?; diff --git a/bin/darkfid/src/utils.rs b/bin/darkfid/src/utils.rs index 36720c348..93c23efe9 100644 --- a/bin/darkfid/src/utils.rs +++ b/bin/darkfid/src/utils.rs @@ -16,70 +16,13 @@ * along with this program. If not, see . */ -use std::{collections::HashMap, sync::Arc}; - -use log::{debug, error, info}; -use smol::{fs::read_to_string, Executor}; +use log::{debug, error}; +use smol::fs::read_to_string; use structopt_toml::StructOptToml; -use darkfi::{ - net::{session::SESSION_DEFAULT, P2p, P2pPtr, Settings}, - rpc::jsonrpc::JsonSubscriber, - util::path::get_config_path, - validator::ValidatorPtr, - Error, Result, -}; +use darkfi::{util::path::get_config_path, Error, Result}; -use crate::{ - proto::{ProtocolProposal, ProtocolSync, ProtocolTx}, - BlockchainNetwork, CONFIG_FILE, -}; - -/// Auxiliary function to generate the P2P network and register all its protocols. -pub async fn spawn_p2p( - settings: &Settings, - validator: &ValidatorPtr, - subscribers: &HashMap<&'static str, JsonSubscriber>, - executor: Arc>, -) -> Result { - info!(target: "darkfid", "Registering sync network P2P protocols..."); - let p2p = P2p::new(settings.clone(), executor.clone()).await?; - let registry = p2p.protocol_registry(); - - let _validator = validator.clone(); - registry - .register(SESSION_DEFAULT, move |channel, _p2p| { - let validator = _validator.clone(); - async move { ProtocolSync::init(channel, validator).await.unwrap() } - }) - .await; - - let _validator = validator.clone(); - let _subscriber = subscribers.get("proposals").unwrap().clone(); - registry - .register(SESSION_DEFAULT, move |channel, p2p| { - let validator = _validator.clone(); - let subscriber = _subscriber.clone(); - async move { - ProtocolProposal::init(channel, validator, p2p, subscriber) - .await - .unwrap() - } - }) - .await; - - let _validator = validator.clone(); - let _subscriber = subscribers.get("txs").unwrap().clone(); - registry - .register(SESSION_DEFAULT, move |channel, p2p| { - let validator = _validator.clone(); - let subscriber = _subscriber.clone(); - async move { ProtocolTx::init(channel, validator, p2p, subscriber).await.unwrap() } - }) - .await; - - Ok(p2p) -} +use crate::{BlockchainNetwork, CONFIG_FILE}; /// Auxiliary function to parse darkfid configuration file and extract requested /// blockchain network config.