darkfid: refactored p2p stack to use generic protos

This commit is contained in:
skoupidi
2024-09-18 19:40:59 +03:00
parent c73ee72a41
commit 25dffaeba2
14 changed files with 804 additions and 652 deletions

View File

@@ -30,7 +30,7 @@ use darkfi::{
async_daemonize,
blockchain::BlockInfo,
cli_desc,
net::{settings::SettingsOpt, P2pPtr},
net::settings::SettingsOpt,
rpc::{
client::RpcChadClient,
jsonrpc::JsonSubscriber,
@@ -60,10 +60,11 @@ use task::{consensus::ConsensusInitTaskConfig, consensus_init_task};
/// P2P net protocols
mod proto;
use proto::{DarkfidP2pHandler, DarkfidP2pHandlerPtr};
/// Utility functions
mod utils;
use utils::{parse_blockchain_config, spawn_p2p};
use utils::parse_blockchain_config;
const CONFIG_FILE: &str = "darkfid_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../darkfid_config.toml");
@@ -186,8 +187,8 @@ impl MinerRpcCLient {
/// Daemon structure
pub struct Darkfid {
/// P2P network pointer
p2p: P2pPtr,
/// P2P network protocols handler.
p2p_handler: DarkfidP2pHandlerPtr,
/// Validator(node) pointer
validator: ValidatorPtr,
/// Flag to specify node is a miner
@@ -206,7 +207,7 @@ pub struct Darkfid {
impl Darkfid {
pub async fn new(
p2p: P2pPtr,
p2p_handler: DarkfidP2pHandlerPtr,
validator: ValidatorPtr,
miner: bool,
txs_batch_size: usize,
@@ -215,7 +216,7 @@ impl Darkfid {
dnet_sub: JsonSubscriber,
) -> Self {
Self {
p2p,
p2p_handler,
validator,
miner,
txs_batch_size,
@@ -288,8 +289,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
// Initialize P2P network
let p2p =
spawn_p2p(&blockchain_config.net.into(), &validator, &subscribers, ex.clone()).await?;
let p2p_handler = DarkfidP2pHandler::init(&blockchain_config.net.into(), &ex).await?;
// Initialize JSON-RPC client to perform requests to minerd
let rpc_client = if blockchain_config.miner {
@@ -319,7 +319,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
info!(target: "darkfid", "Starting dnet subs task");
let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
let dnet_sub_ = dnet_sub.clone();
let p2p_ = p2p.clone();
let p2p_ = p2p_handler.p2p.clone();
let dnet_task = StoppableTask::new();
dnet_task.clone().start(
async move {
@@ -342,11 +342,11 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
// Initialize node
let darkfid = Darkfid::new(
p2p.clone(),
validator,
p2p_handler.clone(),
validator.clone(),
blockchain_config.miner,
txs_batch_size,
subscribers,
subscribers.clone(),
rpc_client,
dnet_sub,
)
@@ -383,7 +383,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
);
info!(target: "darkfid", "Starting P2P network");
p2p.clone().start().await?;
p2p_handler.clone().start(&ex, &validator, &subscribers).await?;
// Consensus protocol
info!(target: "darkfid", "Starting consensus protocol task");
@@ -424,8 +424,8 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
info!(target: "darkfid", "Stopping JSON-RPC server...");
rpc_task.stop().await;
info!(target: "darkfid", "Stopping P2P network...");
p2p.stop().await;
info!(target: "darkfid", "Stopping P2P network protocols handler...");
p2p_handler.stop().await;
info!(target: "darkfid", "Stopping consensus task...");
consensus_task.stop().await;

View File

@@ -16,19 +16,130 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{collections::HashMap, sync::Arc};
use darkfi::{
net::{P2p, P2pPtr, Settings},
rpc::jsonrpc::JsonSubscriber,
system::ExecutorPtr,
validator::ValidatorPtr,
Result,
};
use log::info;
// TODO: Protocal functions need to be protected so peers can't spam us.
/// Block proposal broadcast protocol
mod protocol_proposal;
pub use protocol_proposal::{ProposalMessage, ProtocolProposal};
pub use protocol_proposal::{ProposalMessage, ProtocolProposalHandler, ProtocolProposalHandlerPtr};
/// Validator blockchain sync protocol
mod protocol_sync;
pub use protocol_sync::{
ForkSyncRequest, ForkSyncResponse, HeaderSyncRequest, HeaderSyncResponse, ProtocolSync,
SyncRequest, SyncResponse, TipRequest, TipResponse, BATCH,
ForkSyncRequest, ForkSyncResponse, HeaderSyncRequest, HeaderSyncResponse, ProtocolSyncHandler,
ProtocolSyncHandlerPtr, SyncRequest, SyncResponse, TipRequest, TipResponse, BATCH,
};
/// Transaction broadcast protocol
mod protocol_tx;
pub use protocol_tx::ProtocolTx;
pub use protocol_tx::{ProtocolTxHandler, ProtocolTxHandlerPtr};
/// Atomic pointer to the Darkfid P2P protocols handler.
pub type DarkfidP2pHandlerPtr = Arc<DarkfidP2pHandler>;
/// Darkfid P2P protocols handler.
pub struct DarkfidP2pHandler {
/// P2P network pointer
pub p2p: P2pPtr,
/// `ProtocolProposal` messages handler
proposals: ProtocolProposalHandlerPtr,
/// `ProtocolSync` messages handler
sync: ProtocolSyncHandlerPtr,
/// `ProtocolTx` messages handler
txs: ProtocolTxHandlerPtr,
}
impl DarkfidP2pHandler {
/// Initialize a Darkfid P2P protocols handler.
///
/// A new P2P instance is generated using provided settings and all
/// corresponding protocols are registered.
pub async fn init(settings: &Settings, executor: &ExecutorPtr) -> Result<DarkfidP2pHandlerPtr> {
info!(
target: "darkfid::proto::mod::DarkfidP2pHandler::init",
"Initializing a new Darkfid P2P handler..."
);
// Generate a new P2P instance
let p2p = P2p::new(settings.clone(), executor.clone()).await?;
// Generate a new `ProtocolProposal` messages handler
let proposals = ProtocolProposalHandler::init(&p2p).await;
// Generate a new `ProtocolSync` messages handler
let sync = ProtocolSyncHandler::init(&p2p).await;
// Generate a new `ProtocolTx` messages handler
let txs = ProtocolTxHandler::init(&p2p).await;
info!(
target: "darkfid::proto::mod::DarkfidP2pHandler::init",
"Darkfid P2P handler generated successfully!"
);
Ok(Arc::new(Self { p2p, proposals, sync, txs }))
}
/// Start the Darkfid P2P protocols handler for provided validator.
pub async fn start(
&self,
executor: &ExecutorPtr,
validator: &ValidatorPtr,
subscribers: &HashMap<&'static str, JsonSubscriber>,
) -> Result<()> {
info!(
target: "darkfid::proto::mod::DarkfidP2pHandler::start",
"Starting the Darkfid P2P handler..."
);
// Start the `ProtocolProposal` messages handler
let subscriber = subscribers.get("proposals").unwrap().clone();
self.proposals.start(executor, validator, &self.p2p, subscriber).await?;
// Start the `ProtocolSync` messages handler
self.sync.start(executor, validator).await?;
// Start the `ProtocolTx` messages handler
let subscriber = subscribers.get("txs").unwrap().clone();
self.txs.start(executor, validator, subscriber).await?;
// Start the P2P instance
self.p2p.clone().start().await?;
info!(
target: "darkfid::proto::mod::DarkfidP2pHandler::start",
"Darkfid P2P handler started successfully!"
);
Ok(())
}
/// Stop the Darkfid P2P protocols handler.
pub async fn stop(&self) {
info!(target: "darkfid::proto::mod::DarkfidP2pHandler::stop", "Terminating Darkfid P2P handler...");
// Stop the P2P instance
self.p2p.stop().await;
// Start the `ProtocolTx` messages handler
self.txs.stop().await;
// Start the `ProtocolSync` messages handler
self.sync.stop().await;
// Start the `ProtocolProposal` messages handler
self.proposals.stop().await;
info!(target: "darkfid::proto::mod::DarkfidP2pHandler::stop", "Darkfid P2P handler terminated successfully!");
}
}

View File

@@ -20,16 +20,19 @@ use std::sync::Arc;
use async_trait::async_trait;
use log::{debug, error, warn};
use smol::Executor;
use tinyjson::JsonValue;
use darkfi::{
impl_p2p_message,
net::{
ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
protocol::protocol_generic::{
ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
},
session::SESSION_DEFAULT,
Message, P2pPtr,
},
rpc::jsonrpc::JsonSubscriber,
system::ExecutorPtr,
util::encoding::base64,
validator::{consensus::Proposal, ValidatorPtr},
Error, Result,
@@ -44,191 +47,217 @@ pub struct ProposalMessage(pub Proposal);
impl_p2p_message!(ProposalMessage, "proposal");
pub struct ProtocolProposal {
proposal_sub: MessageSubscription<ProposalMessage>,
proposals_response_sub: MessageSubscription<ForkSyncResponse>,
jobsman: ProtocolJobsManagerPtr,
validator: ValidatorPtr,
p2p: P2pPtr,
channel: ChannelPtr,
subscriber: JsonSubscriber,
/// Atomic pointer to the `ProtocolProposal` handler.
pub type ProtocolProposalHandlerPtr = Arc<ProtocolProposalHandler>;
/// Handler managing [`Proposal`] messages, over a generic P2P protocol.
pub struct ProtocolProposalHandler {
/// The generic handler for [`Proposal`] messages.
handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
}
impl ProtocolProposal {
pub async fn init(
channel: ChannelPtr,
validator: ValidatorPtr,
p2p: P2pPtr,
subscriber: JsonSubscriber,
) -> Result<ProtocolBasePtr> {
impl ProtocolProposalHandler {
/// Initialize a generic prototocol handler for [`Proposal`] messages
/// and registers it to the provided P2P network, using the default session flag.
pub async fn init(p2p: &P2pPtr) -> ProtocolProposalHandlerPtr {
debug!(
target: "darkfid::proto::protocol_proposal::init",
"Adding ProtocolProposal to the protocol registry"
);
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<ProposalMessage>().await;
let proposal_sub = channel.subscribe_msg::<ProposalMessage>().await?;
let proposals_response_sub = channel.subscribe_msg::<ForkSyncResponse>().await?;
let handler = ProtocolGenericHandler::new(p2p, "ProtocolProposal", SESSION_DEFAULT).await;
Ok(Arc::new(Self {
proposal_sub,
proposals_response_sub,
jobsman: ProtocolJobsManager::new("ProposalProtocol", channel.clone()),
validator,
p2p,
channel,
subscriber,
}))
Arc::new(Self { handler })
}
async fn handle_receive_proposal(self: Arc<Self>) -> Result<()> {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "START");
let exclude_list = vec![self.channel.address().clone()];
loop {
let proposal = match self.proposal_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
"recv fail: {e}"
);
continue
/// Start the `ProtocolProposal` background task.
pub async fn start(
&self,
executor: &ExecutorPtr,
validator: &ValidatorPtr,
p2p: &P2pPtr,
subscriber: JsonSubscriber,
) -> Result<()> {
debug!(
target: "darkfid::proto::protocol_proposal::start",
"Starting ProtocolProposal handler task..."
);
self.handler.task.clone().start(
handle_receive_proposal(self.handler.clone(), validator.clone(), p2p.clone(), subscriber),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting ProtocolProposal handler task: {e}"),
}
};
},
Error::DetachedTaskStopped,
executor.clone(),
);
// Check if node has finished syncing its blockchain
if !*self.validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
"Node still syncing blockchain, skipping..."
);
continue
}
debug!(
target: "darkfid::proto::protocol_proposal::start",
"ProtocolProposal handler task started!"
);
let proposal_copy = (*proposal).clone();
match self.validator.append_proposal(&proposal_copy.0).await {
Ok(()) => {
self.p2p.broadcast_with_exclude(&proposal_copy, &exclude_list).await;
let enc_prop =
JsonValue::String(base64::encode(&serialize_async(&proposal_copy).await));
self.subscriber.notify(vec![enc_prop].into()).await;
continue
}
Err(e) => {
debug!(
target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
"append_proposal fail: {e}",
);
match e {
Error::ExtendedChainIndexNotFound => { /* Do nothing */ }
_ => continue,
}
}
};
// If proposal fork chain was not found, we ask our peer for its sequence
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Asking peer for fork sequence");
// Cleanup subscriber
if let Err(e) = self.proposals_response_sub.clean().await {
error!(
target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
"Error during proposals response subscriber cleanup: {e}"
);
continue
};
// Grab last known block to create the request and execute it
let last = match self.validator.blockchain.last() {
Ok(l) => l,
Err(e) => {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Blockchain last retriaval failed: {e}");
continue
}
};
let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal_copy.0.hash) };
if let Err(e) = self.channel.send(&request).await {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Channel send failed: {e}");
continue
};
// Node waits for response
let response = match self
.proposals_response_sub
.receive_with_timeout(self.p2p.settings().read().await.outbound_connect_timeout)
.await
{
Ok(r) => r,
Err(e) => {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Asking peer for fork sequence failed: {e}");
continue
}
};
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Peer response: {response:?}");
// Verify and store retrieved proposals
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Processing received proposals");
// Response should not be empty
if response.proposals.is_empty() {
warn!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Peer responded with empty sequence, node might be out of sync!");
continue
}
// Sequence length must correspond to requested height
if response.proposals.len() as u32 != proposal_copy.0.block.header.height - last.0 {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence length is erroneous");
continue
}
// First proposal must extend canonical
if response.proposals[0].block.header.previous != last.1 {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence doesn't extend canonical");
continue
}
// Last proposal must be the same as the one requested
if response.proposals.last().unwrap().hash != proposal_copy.0.hash {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence doesn't correspond to requested tip");
continue
}
for proposal in &response.proposals {
match self.validator.append_proposal(proposal).await {
Ok(()) => { /* Do nothing */ }
// Skip already existing proposals
Err(Error::ProposalAlreadyExists) => continue,
Err(e) => {
error!(
target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
"Error while appending response proposal: {e}"
);
}
};
let message = ProposalMessage(proposal.clone());
self.p2p.broadcast_with_exclude(&message, &exclude_list).await;
// Notify subscriber
let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
self.subscriber.notify(vec![enc_prop].into()).await;
}
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolProposal {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "darkfid::proto::protocol_proposal::start", "START");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_proposal(), executor.clone()).await;
debug!(target: "darkfid::proto::protocol_proposal::start", "END");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolProposal"
/// Stop the `ProtocolProposal` background task.
pub async fn stop(&self) {
debug!(target: "darkfid::proto::protocol_proposal::stop", "Terminating ProtocolProposal handler task...");
self.handler.task.stop().await;
debug!(target: "darkfid::proto::protocol_proposal::stop", "ProtocolProposal handler task terminated!");
}
}
/// Background handler function for ProtocolProposal.
async fn handle_receive_proposal(
handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
validator: ValidatorPtr,
p2p: P2pPtr,
subscriber: JsonSubscriber,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "START");
loop {
// Wait for a new proposal message
let (channel, proposal) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
"recv fail: {e}"
);
continue
}
};
// Check if node has finished syncing its blockchain
if !*validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
"Node still syncing blockchain, skipping..."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
// Append proposal
match validator.append_proposal(&proposal.0).await {
Ok(()) => {
// Signal handler to broadcast the valid proposal to rest nodes
handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
// Notify subscriber
let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
subscriber.notify(vec![enc_prop].into()).await;
continue
}
Err(e) => {
debug!(
target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
"append_proposal fail: {e}",
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
match e {
Error::ExtendedChainIndexNotFound => { /* Do nothing */ }
_ => continue,
}
}
};
// If proposal fork chain was not found, we ask our peer for its sequence
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Asking peer for fork sequence");
let Some(channel) = p2p.get_channel(channel) else {
error!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Channel {channel} wasn't found.");
continue
};
// Communication setup
let Ok(response_sub) = channel.subscribe_msg::<ForkSyncResponse>().await else {
error!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Failure during `ForkSyncResponse` communication setup with peer: {channel:?}");
continue
};
// Grab last known block to create the request and execute it
let last = match validator.blockchain.last() {
Ok(l) => l,
Err(e) => {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Blockchain last retriaval failed: {e}");
continue
}
};
let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal.0.hash) };
if let Err(e) = channel.send(&request).await {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Channel send failed: {e}");
continue
};
// Node waits for response
let response = match response_sub
.receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
.await
{
Ok(r) => r,
Err(e) => {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Asking peer for fork sequence failed: {e}");
continue
}
};
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Peer response: {response:?}");
// Verify and store retrieved proposals
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Processing received proposals");
// Response should not be empty
if response.proposals.is_empty() {
warn!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Peer responded with empty sequence, node might be out of sync!");
continue
}
// Sequence length must correspond to requested height
if response.proposals.len() as u32 != proposal.0.block.header.height - last.0 {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence length is erroneous");
continue
}
// First proposal must extend canonical
if response.proposals[0].block.header.previous != last.1 {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence doesn't extend canonical");
continue
}
// Last proposal must be the same as the one requested
if response.proposals.last().unwrap().hash != proposal.0.hash {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Response sequence doesn't correspond to requested tip");
continue
}
// Process response proposals
for proposal in &response.proposals {
// Append proposal
match validator.append_proposal(proposal).await {
Ok(()) => { /* Do nothing */ }
// Skip already existing proposals
Err(Error::ProposalAlreadyExists) => continue,
Err(e) => {
error!(
target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
"Error while appending response proposal: {e}"
);
}
};
// Broadcast proposal to rest nodes
let message = ProposalMessage(proposal.clone());
p2p.broadcast_with_exclude(&message, &[channel.address().clone()]).await;
// Notify subscriber
let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
subscriber.notify(vec![enc_prop].into()).await;
}
}
}

View File

@@ -20,17 +20,20 @@ use std::sync::Arc;
use async_trait::async_trait;
use log::{debug, error};
use smol::Executor;
use darkfi::{
blockchain::{BlockInfo, Header, HeaderHash},
impl_p2p_message,
net::{
ChannelPtr, Message, MessageSubscription, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
protocol::protocol_generic::{
ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
},
session::SESSION_DEFAULT,
Message, P2pPtr,
},
system::ExecutorPtr,
validator::{consensus::Proposal, ValidatorPtr},
Result,
Error, Result,
};
use darkfi_serial::{SerialDecodable, SerialEncodable};
@@ -40,7 +43,7 @@ pub const BATCH: usize = 20;
/// Structure represening a request to ask a node for their current
/// canonical(finalized) tip block hash, if they are synced. We also
/// include our own tip, so they can verify we follow the same sequence.
#[derive(Debug, SerialEncodable, SerialDecodable)]
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct TipRequest {
/// Canonical(finalized) tip block hash
pub tip: HeaderHash,
@@ -51,7 +54,7 @@ impl_p2p_message!(TipRequest, "tiprequest");
/// Structure representing the response to `TipRequest`,
/// containing a boolean flag to indicate if we are synced,
/// and our canonical(finalized) tip block height and hash.
#[derive(Debug, SerialEncodable, SerialDecodable)]
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct TipResponse {
/// Flag indicating the node is synced
pub synced: bool,
@@ -65,7 +68,7 @@ impl_p2p_message!(TipResponse, "tipresponse");
/// Structure represening a request to ask a node for up to `BATCH` headers before
/// the provided header height.
#[derive(Debug, SerialEncodable, SerialDecodable)]
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct HeaderSyncRequest {
/// Header height
pub height: u32,
@@ -75,7 +78,7 @@ impl_p2p_message!(HeaderSyncRequest, "headersyncrequest");
/// Structure representing the response to `HeaderSyncRequest`,
/// containing up to `BATCH` headers before the requested block height.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct HeaderSyncResponse {
/// Response headers
pub headers: Vec<Header>,
@@ -85,7 +88,7 @@ impl_p2p_message!(HeaderSyncResponse, "headersyncresponse");
/// Structure represening a request to ask a node for up to`BATCH` blocks
/// of provided headers.
#[derive(Debug, SerialEncodable, SerialDecodable)]
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct SyncRequest {
/// Header hashes
pub headers: Vec<HeaderHash>,
@@ -95,7 +98,7 @@ impl_p2p_message!(SyncRequest, "syncrequest");
/// Structure representing the response to `SyncRequest`,
/// containing up to `BATCH` blocks after the requested block height.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct SyncResponse {
/// Response blocks
pub blocks: Vec<BlockInfo>,
@@ -108,7 +111,7 @@ impl_p2p_message!(SyncResponse, "syncresponse");
/// otherwise they respond with their best fork sequence.
/// We also include our own canonical(finalized) tip, so they can verify
/// we follow the same sequence.
#[derive(Debug, SerialEncodable, SerialDecodable)]
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct ForkSyncRequest {
/// Canonical(finalized) tip block hash
pub tip: HeaderHash,
@@ -120,7 +123,7 @@ impl_p2p_message!(ForkSyncRequest, "forksyncrequest");
/// Structure representing the response to `ForkSyncRequest`,
/// containing the requested fork sequence.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct ForkSyncResponse {
/// Response fork proposals
pub proposals: Vec<Proposal>,
@@ -128,308 +131,346 @@ pub struct ForkSyncResponse {
impl_p2p_message!(ForkSyncResponse, "forksyncresponse");
pub struct ProtocolSync {
tip_sub: MessageSubscription<TipRequest>,
header_sub: MessageSubscription<HeaderSyncRequest>,
request_sub: MessageSubscription<SyncRequest>,
fork_request_sub: MessageSubscription<ForkSyncRequest>,
jobsman: ProtocolJobsManagerPtr,
validator: ValidatorPtr,
channel: ChannelPtr,
/// Atomic pointer to the `ProtocolSync` handler.
pub type ProtocolSyncHandlerPtr = Arc<ProtocolSyncHandler>;
/// Handler managing all `ProtocolSync` messages, over generic P2P protocols.
pub struct ProtocolSyncHandler {
/// The generic handler for `TipRequest` messages.
tip_handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
/// The generic handler for `HeaderSyncRequest` messages.
header_handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
/// The generic handler for `SyncRequest` messages.
sync_handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
/// The generic handler for `ForkSyncRequest` messages.
fork_sync_handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
}
impl ProtocolSync {
pub async fn init(channel: ChannelPtr, validator: ValidatorPtr) -> Result<ProtocolBasePtr> {
impl ProtocolSyncHandler {
/// Initialize the generic prototocol handlers for all `ProtocolSync` messages
/// and register them to the provided P2P network, using the default session flag.
pub async fn init(p2p: &P2pPtr) -> ProtocolSyncHandlerPtr {
debug!(
target: "darkfid::proto::protocol_sync::init",
"Adding ProtocolSync to the protocol registry"
"Adding all sync protocols to the protocol registry"
);
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<TipRequest>().await;
msg_subsystem.add_dispatch::<TipResponse>().await;
msg_subsystem.add_dispatch::<HeaderSyncRequest>().await;
msg_subsystem.add_dispatch::<HeaderSyncResponse>().await;
msg_subsystem.add_dispatch::<SyncRequest>().await;
msg_subsystem.add_dispatch::<SyncResponse>().await;
msg_subsystem.add_dispatch::<ForkSyncRequest>().await;
msg_subsystem.add_dispatch::<ForkSyncResponse>().await;
let tip_sub = channel.subscribe_msg::<TipRequest>().await?;
let header_sub = channel.subscribe_msg::<HeaderSyncRequest>().await?;
let request_sub = channel.subscribe_msg::<SyncRequest>().await?;
let fork_request_sub = channel.subscribe_msg::<ForkSyncRequest>().await?;
let tip_handler =
ProtocolGenericHandler::new(p2p, "ProtocolSyncTip", SESSION_DEFAULT).await;
let header_handler =
ProtocolGenericHandler::new(p2p, "ProtocolSyncHeader", SESSION_DEFAULT).await;
let sync_handler = ProtocolGenericHandler::new(p2p, "ProtocolSync", SESSION_DEFAULT).await;
let fork_sync_handler =
ProtocolGenericHandler::new(p2p, "ProtocolSyncFork", SESSION_DEFAULT).await;
Ok(Arc::new(Self {
tip_sub,
header_sub,
request_sub,
fork_request_sub,
jobsman: ProtocolJobsManager::new("SyncProtocol", channel.clone()),
validator,
channel,
}))
Arc::new(Self { tip_handler, header_handler, sync_handler, fork_sync_handler })
}
async fn handle_receive_tip_request(self: Arc<Self>) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "START");
loop {
let request = match self.tip_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"recv fail: {e}"
);
continue
/// Start all `ProtocolSync` background tasks.
pub async fn start(&self, executor: &ExecutorPtr, validator: &ValidatorPtr) -> Result<()> {
debug!(
target: "darkfid::proto::protocol_sync::start",
"Starting sync protocols handlers tasks..."
);
self.tip_handler.task.clone().start(
handle_receive_tip_request(self.tip_handler.clone(), validator.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncTip handler task: {e}"),
}
};
},
Error::DetachedTaskStopped,
executor.clone(),
);
// Check if node has finished syncing its blockchain
let response = if !*self.validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"Node still syncing blockchain, skipping..."
);
TipResponse { synced: false, height: None, hash: None }
} else {
// Check we follow the same sequence
match self.validator.blockchain.blocks.contains(&request.tip) {
Ok(contains) => {
if !contains {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"Node doesn't follow request sequence"
);
continue
}
}
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"block_store.contains fail: {e}"
);
continue
}
self.header_handler.task.clone().start(
handle_receive_header_request(self.header_handler.clone(), validator.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncHeader handler task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
// Grab our current tip and return it
let tip = match self.validator.blockchain.last() {
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"blockchain.last fail: {e}"
);
continue
}
};
TipResponse { synced: true, height: Some(tip.0), hash: Some(tip.1) }
};
if let Err(e) = self.channel.send(&response).await {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"Channel send fail: {e}"
)
};
}
}
async fn handle_receive_header_request(self: Arc<Self>) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "START");
loop {
let request = match self.header_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"recv fail: {}",
e
);
continue
self.sync_handler.task.clone().start(
handle_receive_request(self.sync_handler.clone(), validator.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSync handler task: {e}"),
}
};
},
Error::DetachedTaskStopped,
executor.clone(),
);
// Check if node has finished syncing its blockchain
if !*self.validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"Node still syncing blockchain, skipping..."
);
continue
}
let headers = match self.validator.blockchain.get_headers_before(request.height, BATCH)
{
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"get_headers_before fail: {}",
e
);
continue
self.fork_sync_handler.task.clone().start(
handle_receive_fork_request(self.fork_sync_handler.clone(), validator.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncFork handler task: {e}"),
}
};
},
Error::DetachedTaskStopped,
executor.clone(),
);
let response = HeaderSyncResponse { headers };
if let Err(e) = self.channel.send(&response).await {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"channel send fail: {}",
e
)
};
}
}
debug!(
target: "darkfid::proto::protocol_sync::start",
"Sync protocols handlers tasks started!"
);
async fn handle_receive_request(self: Arc<Self>) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "START");
loop {
let request = match self.request_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"recv fail: {}",
e
);
continue
}
};
// Check if node has finished syncing its blockchain
if !*self.validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"Node still syncing blockchain, skipping..."
);
continue
}
// Check if request exists the configured limit
if request.headers.len() > BATCH {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"Node requested more blocks than allowed."
);
continue
}
let blocks = match self.validator.blockchain.get_blocks_by_hash(&request.headers) {
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"get_blocks_after fail: {}",
e
);
continue
}
};
let response = SyncResponse { blocks };
if let Err(e) = self.channel.send(&response).await {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"channel send fail: {}",
e
)
};
}
}
async fn handle_receive_fork_request(self: Arc<Self>) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "START");
loop {
let request = match self.fork_request_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
"recv fail: {}",
e
);
continue
}
};
// Check if node has finished syncing its blockchain
if !*self.validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
"Node still syncing blockchain, skipping..."
);
continue
}
debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "Received request: {request:?}");
// If a fork tip is provided, grab its fork proposals sequence.
// Otherwise, grab best fork proposals sequence.
let proposals = match request.fork_tip {
Some(fork_tip) => {
self.validator
.consensus
.get_fork_proposals(request.tip, fork_tip, BATCH as u32)
.await
}
None => {
self.validator
.consensus
.get_best_fork_proposals(request.tip, BATCH as u32)
.await
}
};
let proposals = match proposals {
Ok(p) => p,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"Getting fork proposals failed: {}",
e
);
continue
}
};
let response = ForkSyncResponse { proposals };
debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "Response: {response:?}");
if let Err(e) = self.channel.send(&response).await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
"channel send fail: {}",
e
)
};
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolSync {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::start", "START");
self.jobsman.clone().start(executor.clone());
self.jobsman
.clone()
.spawn(self.clone().handle_receive_tip_request(), executor.clone())
.await;
self.jobsman
.clone()
.spawn(self.clone().handle_receive_header_request(), executor.clone())
.await;
self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await;
self.jobsman
.clone()
.spawn(self.clone().handle_receive_fork_request(), executor.clone())
.await;
debug!(target: "darkfid::proto::protocol_sync::start", "END");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolSync"
/// Stop all `ProtocolSync` background tasks.
pub async fn stop(&self) {
debug!(target: "darkfid::proto::protocol_sync::stop", "Terminating sync protocols handlers tasks...");
self.tip_handler.task.stop().await;
self.header_handler.task.stop().await;
self.sync_handler.task.stop().await;
self.fork_sync_handler.task.stop().await;
debug!(target: "darkfid::proto::protocol_sync::stop", "Sync protocols handlers tasks terminated!");
}
}
/// Background handler function for ProtocolSyncTip.
async fn handle_receive_tip_request(
handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
validator: ValidatorPtr,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "START");
loop {
// Wait for a new tip request message
let (channel, request) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"recv fail: {e}"
);
continue
}
};
// Check if node has finished syncing its blockchain
let response = if !*validator.synced.read().await {
TipResponse { synced: false, height: None, hash: None }
} else {
// Check we follow the same sequence
match validator.blockchain.blocks.contains(&request.tip) {
Ok(contains) => {
if !contains {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"Node doesn't follow request sequence"
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
}
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"block_store.contains fail: {e}"
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
}
// Grab our current tip and return it
let tip = match validator.blockchain.last() {
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"blockchain.last fail: {e}"
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
};
TipResponse { synced: true, height: Some(tip.0), hash: Some(tip.1) }
};
// Send response
handler.send_action(channel, ProtocolGenericAction::Response(response)).await;
}
}
/// Background handler function for ProtocolSyncHeader.
async fn handle_receive_header_request(
handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
validator: ValidatorPtr,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "START");
loop {
// Wait for a new header request message
let (channel, request) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"recv fail: {e}"
);
continue
}
};
// Check if node has finished syncing its blockchain
if !*validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"Node still syncing blockchain, skipping..."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
// Grab the corresponding headers
let headers = match validator.blockchain.get_headers_before(request.height, BATCH) {
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"get_headers_before fail: {}",
e
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
};
// Send response
handler
.send_action(channel, ProtocolGenericAction::Response(HeaderSyncResponse { headers }))
.await;
}
}
/// Background handler function for ProtocolSync.
async fn handle_receive_request(
handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
validator: ValidatorPtr,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "START");
loop {
// Wait for a new sync request message
let (channel, request) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"recv fail: {e}"
);
continue
}
};
// Check if node has finished syncing its blockchain
if !*validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"Node still syncing blockchain, skipping..."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
// Check if request exists the configured limit
if request.headers.len() > BATCH {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"Node requested more blocks than allowed."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
// Grab the corresponding blocks
let blocks = match validator.blockchain.get_blocks_by_hash(&request.headers) {
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"get_blocks_after fail: {}",
e
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
};
// Send response
handler
.send_action(channel, ProtocolGenericAction::Response(SyncResponse { blocks }))
.await;
}
}
/// Background handler function for ProtocolSyncFork.
async fn handle_receive_fork_request(
handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
validator: ValidatorPtr,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "START");
loop {
// Wait for a new fork sync request message
let (channel, request) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
"recv fail: {e}"
);
continue
}
};
// Check if node has finished syncing its blockchain
if !*validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
"Node still syncing blockchain, skipping..."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "Received request: {request:?}");
// If a fork tip is provided, grab its fork proposals sequence.
// Otherwise, grab best fork proposals sequence.
let proposals = match request.fork_tip {
Some(fork_tip) => {
validator.consensus.get_fork_proposals(request.tip, fork_tip, BATCH as u32).await
}
None => validator.consensus.get_best_fork_proposals(request.tip, BATCH as u32).await,
};
let proposals = match proposals {
Ok(p) => p,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
"Getting fork proposals failed: {}",
e
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
};
// Send response
handler
.send_action(channel, ProtocolGenericAction::Response(ForkSyncResponse { proposals }))
.await;
}
}

View File

@@ -18,119 +18,134 @@
use std::sync::Arc;
use async_trait::async_trait;
use log::debug;
use smol::Executor;
use log::{debug, error};
use tinyjson::JsonValue;
use url::Url;
use darkfi::{
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
protocol::protocol_generic::{
ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
},
session::SESSION_DEFAULT,
P2pPtr,
},
rpc::jsonrpc::JsonSubscriber,
system::ExecutorPtr,
tx::Transaction,
util::encoding::base64,
validator::ValidatorPtr,
Result,
Error, Result,
};
use darkfi_serial::serialize_async;
pub struct ProtocolTx {
tx_sub: MessageSubscription<Transaction>,
jobsman: ProtocolJobsManagerPtr,
validator: ValidatorPtr,
p2p: P2pPtr,
channel_address: Url,
subscriber: JsonSubscriber,
/// Atomic pointer to the `ProtocolTx` handler.
pub type ProtocolTxHandlerPtr = Arc<ProtocolTxHandler>;
/// Handler managing [`Transaction`] messages, over a generic P2P protocol.
pub struct ProtocolTxHandler {
/// The generic handler for [`Transaction`] messages.
handler: ProtocolGenericHandlerPtr<Transaction, Transaction>,
}
impl ProtocolTx {
pub async fn init(
channel: ChannelPtr,
validator: ValidatorPtr,
p2p: P2pPtr,
subscriber: JsonSubscriber,
) -> Result<ProtocolBasePtr> {
impl ProtocolTxHandler {
/// Initialize a generic prototocol handler for [`Transaction`] messages
/// and registers it to the provided P2P network, using the default session flag.
pub async fn init(p2p: &P2pPtr) -> ProtocolTxHandlerPtr {
debug!(
target: "darkfid::proto::protocol_tx::init",
"Adding ProtocolTx to the protocol registry"
);
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<Transaction>().await;
let tx_sub = channel.subscribe_msg::<Transaction>().await?;
let handler = ProtocolGenericHandler::new(p2p, "ProtocolTx", SESSION_DEFAULT).await;
Ok(Arc::new(Self {
tx_sub,
jobsman: ProtocolJobsManager::new("TxProtocol", channel.clone()),
validator,
p2p,
channel_address: channel.address().clone(),
subscriber,
}))
Arc::new(Self { handler })
}
async fn handle_receive_tx(self: Arc<Self>) -> Result<()> {
/// Start the `ProtocolTx` background task.
pub async fn start(
&self,
executor: &ExecutorPtr,
validator: &ValidatorPtr,
subscriber: JsonSubscriber,
) -> Result<()> {
debug!(
target: "darkfid::proto::protocol_tx::handle_receive_tx",
"START"
target: "darkfid::proto::protocol_tx::start",
"Starting ProtocolTx handler task..."
);
let exclude_list = vec![self.channel_address.clone()];
loop {
let tx = match self.tx_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_tx::handle_receive_tx",
"recv fail: {e}"
);
continue
self.handler.task.clone().start(
handle_receive_tx(self.handler.clone(), validator.clone(), subscriber),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "darkfid::proto::protocol_tx::start", "Failed starting ProtocolTx handler task: {e}"),
}
};
},
Error::DetachedTaskStopped,
executor.clone(),
);
// Check if node has finished syncing its blockchain
if !*self.validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_tx::handle_receive_tx",
"Node still syncing blockchain, skipping..."
);
continue
}
debug!(
target: "darkfid::proto::protocol_tx::start",
"ProtocolTx handler task started!"
);
let tx_copy = (*tx).clone();
// Nodes use unconfirmed_txs vector as seen_txs pool.
match self.validator.append_tx(&tx_copy, true).await {
Ok(()) => {
self.p2p.broadcast_with_exclude(&tx_copy, &exclude_list).await;
let encoded_tx =
JsonValue::String(base64::encode(&serialize_async(&tx_copy).await));
self.subscriber.notify(vec![encoded_tx].into()).await;
}
Err(e) => {
debug!(
target: "darkfid::proto::protocol_tx::handle_receive_tx",
"append_tx fail: {e}"
);
}
}
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolTx {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "darkfid::proto::protocol_tx::start", "START");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_tx(), executor.clone()).await;
debug!(target: "darkfid::proto::protocol_tx::start", "END");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolTx"
/// Stop the `ProtocolTx` background task.
pub async fn stop(&self) {
debug!(target: "darkfid::proto::protocol_tx::stop", "Terminating ProtocolTx handler task...");
self.handler.task.stop().await;
debug!(target: "darkfid::proto::protocol_tx::stop", "ProtocolTx handler task terminated!");
}
}
/// Background handler function for ProtocolTx.
async fn handle_receive_tx(
handler: ProtocolGenericHandlerPtr<Transaction, Transaction>,
validator: ValidatorPtr,
subscriber: JsonSubscriber,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_tx::handle_receive_tx", "START");
loop {
// Wait for a new transaction message
let (channel, tx) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_tx::handle_receive_tx",
"recv fail: {e}"
);
continue
}
};
// Check if node has finished syncing its blockchain
if !*validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_tx::handle_receive_tx",
"Node still syncing blockchain, skipping..."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
// Append transaction
if let Err(e) = validator.append_tx(&tx, true).await {
debug!(
target: "darkfid::proto::protocol_tx::handle_receive_tx",
"append_tx fail: {e}"
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
// Signal handler to broadcast the valid transaction to rest nodes
handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
// Notify subscriber
let encoded_tx = JsonValue::String(base64::encode(&serialize_async(&tx).await));
subscriber.notify(vec![encoded_tx].into()).await;
}
}

View File

@@ -121,9 +121,9 @@ impl Darkfid {
let switch = params[0].get::<bool>().unwrap();
if *switch {
self.p2p.dnet_enable();
self.p2p_handler.p2p.dnet_enable();
} else {
self.p2p.dnet_disable();
self.p2p_handler.p2p.dnet_disable();
}
JsonResponse::new(JsonValue::Boolean(true), id).into()
@@ -224,6 +224,6 @@ impl Darkfid {
impl HandlerP2p for Darkfid {
fn p2p(&self) -> P2pPtr {
self.p2p.clone()
self.p2p_handler.p2p.clone()
}
}

View File

@@ -133,8 +133,8 @@ impl Darkfid {
return server_error(RpcError::TxSimulationFail, id, None)
};
self.p2p.broadcast(&tx).await;
if !self.p2p.is_connected() {
self.p2p_handler.p2p.broadcast(&tx).await;
if !self.p2p_handler.p2p.is_connected() {
warn!(target: "darkfid::rpc::tx_broadcast", "No connected channels to broadcast tx");
}

View File

@@ -166,7 +166,7 @@ async fn replicator_task(node: Arc<Darkfid>, ex: Arc<smol::Executor<'static>>) -
let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
// Subscribe to the network disconnect subscriber
let net_subscription = node.p2p.hosts().subscribe_disconnect().await;
let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await;
let result = smol::future::or(
monitor_network(&net_subscription),

View File

@@ -338,7 +338,7 @@ async fn mine_next_block(
extended_fork.module.verify_current_block(&next_block)?;
// Check if we are connected to the network
if !skip_sync && !node.p2p.is_connected() {
if !skip_sync && !node.p2p_handler.p2p.is_connected() {
return Err(Error::NetworkNotConnected)
}
@@ -348,7 +348,7 @@ async fn mine_next_block(
// Broadcast proposal to the network
let message = ProposalMessage(proposal);
node.p2p.broadcast(&message).await;
node.p2p_handler.p2p.broadcast(&message).await;
Ok(())
}

View File

@@ -138,11 +138,11 @@ async fn synced_peers(
checkpoint: Option<(u32, HeaderHash)>,
) -> HashMap<(u32, [u8; 32]), Vec<ChannelPtr>> {
info!(target: "darkfid::task::sync::synced_peers", "Receiving tip from peers...");
let comms_timeout = node.p2p.settings().read().await.outbound_connect_timeout;
let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout;
let mut tips = HashMap::new();
loop {
// Grab channels
let peers = node.p2p.hosts().channels();
let peers = node.p2p_handler.p2p.hosts().channels();
// Ask each peer(if we got any) if they are synced
for peer in peers {
@@ -210,7 +210,7 @@ async fn synced_peers(
}
warn!(target: "darkfid::task::sync::synced_peers", "Node is not connected to other synced nodes, waiting to retry...");
let subscription = node.p2p.hosts().subscribe_channel().await;
let subscription = node.p2p_handler.p2p.hosts().subscribe_channel().await;
let _ = subscription.receive().await;
subscription.unsubscribe().await;
@@ -270,7 +270,7 @@ async fn retrieve_headers(
}
}
}
let comms_timeout = node.p2p.settings().read().await.outbound_connect_timeout;
let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout;
// We subtract 1 since tip_height is increased by one
let total = tip_height - last_known - 1;
@@ -390,7 +390,7 @@ async fn retrieve_blocks(
}
}
}
let comms_timeout = node.p2p.settings().read().await.outbound_connect_timeout;
let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout;
let mut received_blocks = 0;
let total = node.validator.blockchain.headers.len_sync();
@@ -505,7 +505,7 @@ async fn sync_best_fork(node: &Darkfid, peers: &[ChannelPtr], last_tip: &HeaderH
// Node waits for response
let Ok(response) = response_sub
.receive_with_timeout(node.p2p.settings().read().await.outbound_connect_timeout)
.receive_with_timeout(node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout)
.await
else {
debug!(target: "darkfid::task::sync::sync_best_fork", "Timeout while waiting for `ForkSyncResponse` from peer: {peer:?}");

View File

@@ -41,7 +41,11 @@ use num_bigint::BigUint;
use sled_overlay::sled;
use url::Url;
use crate::{proto::ProposalMessage, task::sync::sync_task, utils::spawn_p2p, Darkfid};
use crate::{
proto::{DarkfidP2pHandler, ProposalMessage},
task::sync::sync_task,
Darkfid,
};
pub struct HarnessConfig {
pub pow_target: u32,
@@ -150,7 +154,7 @@ impl Harness {
let proposal = Proposal::new(block.clone());
self.alice.validator.append_proposal(&proposal).await?;
let message = ProposalMessage(proposal);
self.alice.p2p.broadcast(&message).await;
self.alice.p2p_handler.p2p.broadcast(&message).await;
}
// Sleep a bit so blocks can be propagated and then
@@ -242,10 +246,19 @@ pub async fn generate_node(
// We initialize a dnet subscriber but do not activate it.
let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
let p2p = spawn_p2p(settings, &validator, &subscribers, ex.clone()).await?;
let node = Darkfid::new(p2p.clone(), validator, miner, 50, subscribers, None, dnet_sub).await;
let p2p_handler = DarkfidP2pHandler::init(settings, ex).await?;
let node = Darkfid::new(
p2p_handler.clone(),
validator.clone(),
miner,
50,
subscribers.clone(),
None,
dnet_sub,
)
.await;
p2p.start().await?;
p2p_handler.clone().start(ex, &validator, &subscribers).await?;
node.validator.consensus.generate_empty_fork().await?;

View File

@@ -90,7 +90,7 @@ async fn sync_blocks_real(ex: Arc<Executor<'static>>) -> Result<()> {
let charlie_url = Url::parse("tcp+tls://127.0.0.1:18342")?;
settings.inbound_addrs = vec![charlie_url];
let bob_url = th.bob.p2p.settings().read().await.inbound_addrs[0].clone();
let bob_url = th.bob.p2p_handler.p2p.settings().read().await.inbound_addrs[0].clone();
settings.peers = vec![bob_url];
let charlie = generate_node(
&th.vks,

View File

@@ -67,7 +67,7 @@ async fn sync_forks_real(ex: Arc<Executor<'static>>) -> Result<()> {
let charlie_url = Url::parse("tcp+tls://127.0.0.1:18442")?;
settings.inbound_addrs = vec![charlie_url];
let bob_url = th.bob.p2p.settings().read().await.inbound_addrs[0].clone();
let bob_url = th.bob.p2p_handler.p2p.settings().read().await.inbound_addrs[0].clone();
settings.peers = vec![bob_url];
let charlie =
generate_node(&th.vks, &th.validator_config, &settings, &ex, false, false, None).await?;

View File

@@ -16,70 +16,13 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{collections::HashMap, sync::Arc};
use log::{debug, error, info};
use smol::{fs::read_to_string, Executor};
use log::{debug, error};
use smol::fs::read_to_string;
use structopt_toml::StructOptToml;
use darkfi::{
net::{session::SESSION_DEFAULT, P2p, P2pPtr, Settings},
rpc::jsonrpc::JsonSubscriber,
util::path::get_config_path,
validator::ValidatorPtr,
Error, Result,
};
use darkfi::{util::path::get_config_path, Error, Result};
use crate::{
proto::{ProtocolProposal, ProtocolSync, ProtocolTx},
BlockchainNetwork, CONFIG_FILE,
};
/// Auxiliary function to generate the P2P network and register all its protocols.
pub async fn spawn_p2p(
settings: &Settings,
validator: &ValidatorPtr,
subscribers: &HashMap<&'static str, JsonSubscriber>,
executor: Arc<Executor<'static>>,
) -> Result<P2pPtr> {
info!(target: "darkfid", "Registering sync network P2P protocols...");
let p2p = P2p::new(settings.clone(), executor.clone()).await?;
let registry = p2p.protocol_registry();
let _validator = validator.clone();
registry
.register(SESSION_DEFAULT, move |channel, _p2p| {
let validator = _validator.clone();
async move { ProtocolSync::init(channel, validator).await.unwrap() }
})
.await;
let _validator = validator.clone();
let _subscriber = subscribers.get("proposals").unwrap().clone();
registry
.register(SESSION_DEFAULT, move |channel, p2p| {
let validator = _validator.clone();
let subscriber = _subscriber.clone();
async move {
ProtocolProposal::init(channel, validator, p2p, subscriber)
.await
.unwrap()
}
})
.await;
let _validator = validator.clone();
let _subscriber = subscribers.get("txs").unwrap().clone();
registry
.register(SESSION_DEFAULT, move |channel, p2p| {
let validator = _validator.clone();
let subscriber = _subscriber.clone();
async move { ProtocolTx::init(channel, validator, p2p, subscriber).await.unwrap() }
})
.await;
Ok(p2p)
}
use crate::{BlockchainNetwork, CONFIG_FILE};
/// Auxiliary function to parse darkfid configuration file and extract requested
/// blockchain network config.