diff --git a/bin/darkfid2/src/main.rs b/bin/darkfid2/src/main.rs index e97a6aa65..69acdb512 100644 --- a/bin/darkfid2/src/main.rs +++ b/bin/darkfid2/src/main.rs @@ -44,6 +44,9 @@ use client::Client; mod error; use error::{server_error, RpcError}; +mod protocol; +use protocol::ProtocolProposal; + const CONFIG_FILE: &str = "darkfid_config.toml"; const CONFIG_FILE_CONTENTS: &str = include_str!("../darkfid_config.toml"); @@ -337,6 +340,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { let p2p = net::P2p::new(network_settings).await; let registry = p2p.protocol_registry(); // TODO: Register protocols + debug!("Adding ProtocolProposal to the protocol registry"); info!("Starting P2P networking"); p2p.clone().start(ex.clone()).await?; diff --git a/bin/darkfid2/src/protocol/mod.rs b/bin/darkfid2/src/protocol/mod.rs new file mode 100644 index 000000000..1a27b7453 --- /dev/null +++ b/bin/darkfid2/src/protocol/mod.rs @@ -0,0 +1,2 @@ +mod protocol_participant; +pub use protocol_participant::ProtocolParticipant; diff --git a/bin/darkfid2/src/protocol/protocol_participant.rs b/bin/darkfid2/src/protocol/protocol_participant.rs new file mode 100644 index 000000000..d7d9e7642 --- /dev/null +++ b/bin/darkfid2/src/protocol/protocol_participant.rs @@ -0,0 +1,74 @@ +use async_executor::Executor; +use async_std::sync::Arc; +use async_trait::async_trait; +use log::debug; + +use darkfi::{ + consensus::{participant::Participant, state::ValidatorStatePtr}, + net::{ + ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, + ProtocolJobsManager, ProtocolJobsManagerPtr, + }, + Result, +}; + +pub struct ProtocolParticipant { + participant_sub: MessageSubscription, + jobsman: ProtocolJobsManagerPtr, + state: ValidatorStatePtr, + p2p: P2pPtr, +} + +impl ProtocolParticipant { + pub async fn init( + channel: ChannelPtr, + state: ValidatorStatePtr, + p2p: P2pPtr, + ) -> Result { + let msg_subsystem = channel.get_message_subsystem(); + msg_subsystem.add_dispatch::().await; + + let participant_sub = channel.subscribe_msg::().await?; + + Ok(Arc::new(Self { + participant_sub, + jobsman: ProtocolJobsManager::new("ParticipantProtocol", channel), + state, + p2p, + })) + } + + async fn handle_receive_participant(self: Arc) -> Result<()> { + debug!("ProtocolParticipant::handle_receive_participant() [START]"); + loop { + let participant = self.participant_sub.receive().await?; + + debug!("ProtocolParticipant::handle_receive_participant() recv: {:?}", participant); + if self.state.write().unwrap().append_participant((*participant).clone()) { + let pending_participants = + self.state.read().unwrap().consensus.pending_participants.clone(); + for participant in pending_participants { + self.p2p.broadcast(participant).await?; + } + } + } + } +} + +#[async_trait] +impl ProtocolBase for ProtocolParticipant { + async fn start(self: Arc, executor: Arc>) -> Result<()> { + debug!("ProtocolParticipant::start() [START]"); + self.jobsman.clone().start(executor.clone()); + self.jobsman + .clone() + .spawn(self.clone().handle_receive_participant(), executor.clone()) + .await; + debug!("ProtocolParticipant::start() [END]"); + Ok(()) + } + + fn name(&self) -> &'static str { + "ProtocolParticipant" + } +}