darkfid: P2P protocol stub.

This commit is contained in:
parazyd
2022-04-15 17:25:20 +02:00
parent d77d4dc02c
commit fab196067b
3 changed files with 80 additions and 0 deletions

View File

@@ -44,6 +44,9 @@ use client::Client;
mod error;
use error::{server_error, RpcError};
mod protocol;
use protocol::ProtocolProposal;
const CONFIG_FILE: &str = "darkfid_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../darkfid_config.toml");
@@ -337,6 +340,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
let p2p = net::P2p::new(network_settings).await;
let registry = p2p.protocol_registry();
// TODO: Register protocols
debug!("Adding ProtocolProposal to the protocol registry");
info!("Starting P2P networking");
p2p.clone().start(ex.clone()).await?;

View File

@@ -0,0 +1,2 @@
mod protocol_participant;
pub use protocol_participant::ProtocolParticipant;

View File

@@ -0,0 +1,74 @@
use async_executor::Executor;
use async_std::sync::Arc;
use async_trait::async_trait;
use log::debug;
use darkfi::{
consensus::{participant::Participant, state::ValidatorStatePtr},
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
Result,
};
pub struct ProtocolParticipant {
participant_sub: MessageSubscription<Participant>,
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
}
impl ProtocolParticipant {
pub async fn init(
channel: ChannelPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
) -> Result<ProtocolBasePtr> {
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<Participant>().await;
let participant_sub = channel.subscribe_msg::<Participant>().await?;
Ok(Arc::new(Self {
participant_sub,
jobsman: ProtocolJobsManager::new("ParticipantProtocol", channel),
state,
p2p,
}))
}
async fn handle_receive_participant(self: Arc<Self>) -> Result<()> {
debug!("ProtocolParticipant::handle_receive_participant() [START]");
loop {
let participant = self.participant_sub.receive().await?;
debug!("ProtocolParticipant::handle_receive_participant() recv: {:?}", participant);
if self.state.write().unwrap().append_participant((*participant).clone()) {
let pending_participants =
self.state.read().unwrap().consensus.pending_participants.clone();
for participant in pending_participants {
self.p2p.broadcast(participant).await?;
}
}
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolParticipant {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!("ProtocolParticipant::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman
.clone()
.spawn(self.clone().handle_receive_participant(), executor.clone())
.await;
debug!("ProtocolParticipant::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolParticipant"
}
}