darkfid: remove second miners p2p

The usage of a second p2p network for miners was a premature optimization for faster block propagation between block producers, but in reality we don't know if its required yet, therefore we eliminate the extra complexity it introduces
This commit is contained in:
skoupidi
2024-03-08 14:40:11 +02:00
parent 99d149dd9b
commit c6637029fe
21 changed files with 143 additions and 593 deletions

View File

@@ -40,8 +40,8 @@ recipient = "5ZHfYpt4mpJcwBNxfEyxLzeFJUEeoePs5NQ5jVEgHrMf"
# Skip syncing process and start node right away
skip_sync = true
## Localnet sync P2P network settings
[network_config."localnet".sync_net]
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:8242"]
@@ -94,56 +94,6 @@ localnet = true
# Time between peer discovery attempts
#outbound_peer_discovery_attempt_time = 5
## Localnet miners P2P network settings
[network_config."localnet".miners_net]
# P2P accept addresses the instance listens on for inbound connections
#inbound = ["tcp+tls://0.0.0.0:8241"]
# P2P external addresses the instance advertises so other peers can
# reach us and connect to us, as long as inbound addrs are configured.
#external_addrs = []
# Peer nodes to manually connect to
#peers = []
# Seed nodes to connect to for peer discovery and/or adversising our
# own external addresses
#seeds = []
# Whitelisted network transports for outbound connections
#allowed_transports = ["tcp+tls"]
# Allow transport mixing (e.g. Tor would be allowed to connect to `tcp://`)
#transport_mixing = true
# Outbound connection slots number, this many connections will be
# attempted. (This does not include manual connections)
#outbound_connections = 8
# Manual connections retry limit, 0 for forever looping
#manual_attempt_limit = 0
# Outbound connection timeout (in seconds)
#outbound_connect_timeout = 10
# Exchange versions (handshake) timeout (in seconds)
#channel_handshake_timeout = 4
# Ping-pong exchange execution interval (in seconds)
#channel_heartbeat_interval = 10
# Allow localnet hosts
localnet = true
# Delete a peer from hosts if they've been quarantined N times
#hosts_quarantine_limit = 50
# Cooling off time for peer discovery when unsuccessful
#outbound_peer_discovery_cooloff_time = 30
# Time between peer discovery attempts
#outbound_peer_discovery_attempt_time = 5
# Testnet blockchain network configuration
[network_config."testnet"]
# Path to the blockchain database directory
@@ -167,8 +117,8 @@ miner = false
# Skip syncing process and start node right away
skip_sync = false
## Testnet sync P2P network settings
[network_config."testnet".sync_net]
## Testnet P2P network settings
[network_config."testnet".net]
# P2P accept addresses the instance listens on for inbound connections
# You can also use an IPv6 address
inbound = ["tcp+tls://0.0.0.0:8342"]
@@ -231,66 +181,6 @@ localnet = false
# Time between peer discovery attempts
#outbound_peer_discovery_attempt_time = 5
## Testnet miners P2P network settings
[network_config."testnet".miners_net]
# P2P accept addresses the instance listens on for inbound connections
# You can also use an IPv6 address
inbound = ["tcp+tls://0.0.0.0:8341"]
# IPv6 version:
#inbound = ["tcp+tls://[::]:8341"]
# Combined:
#inbound = ["tcp+tls://0.0.0.0:8341", "tcp+tls://[::]:8341"]
# P2P external addresses the instance advertises so other peers can
# reach us and connect to us, as long as inbound addrs are configured.
# You can also use an IPv6 address
#external_addrs = ["tcp+tls://XXX.XXX.XXX.XXX:8341"]
# IPv6 version:
#external_addrs = ["tcp+tls://[ipv6 address here]:8341"]
# Combined:
#external_addrs = ["tcp+tls://XXX.XXX.XXX.XXX:8341", "tcp+tls://[ipv6 address here]:8341"]
# Peer nodes to manually connect to
#peers = []
# Seed nodes to connect to for peer discovery and/or adversising our
# own external addresses
seeds = ["tcp+tls://lilith0.dark.fi:8341", "tcp+tls://lilith1.dark.fi:8341"]
# Whitelisted network transports for outbound connections
allowed_transports = ["tcp+tls"]
# Allow transport mixing (e.g. Tor would be allowed to connect to `tcp://`)
#transport_mixing = true
# Outbound connection slots number, this many connections will be
# attempted. (This does not include manual connections)
#outbound_connections = 8
# Manual connections retry limit, 0 for forever looping
#manual_attempt_limit = 0
# Outbound connection timeout (in seconds)
#outbound_connect_timeout = 10
# Exchange versions (handshake) timeout (in seconds)
#channel_handshake_timeout = 4
# Ping-pong exchange execution interval (in seconds)
#channel_heartbeat_interval = 10
# Allow localnet hosts
localnet = false
# Delete a peer from hosts if they've been quarantined N times
#hosts_quarantine_limit = 50
# Cooling off time for peer discovery when unsuccessful
#outbound_peer_discovery_cooloff_time = 30
# Time between peer discovery attempts
#outbound_peer_discovery_attempt_time = 5
# Mainnet blockchain network configuration
[network_config."mainnet"]
# Path to the blockchain database directory
@@ -314,8 +204,8 @@ miner = false
# Skip syncing process and start node right away
skip_sync = false
## Mainnet sync P2P network settings
[network_config."mainnet".sync_net]
## Mainnet P2P network settings
[network_config."mainnet".net]
# P2P accept addresses the instance listens on for inbound connections
# You can also use an IPv6 address
inbound = ["tcp+tls://0.0.0.0:8442"]
@@ -377,63 +267,3 @@ localnet = false
# Time between peer discovery attempts
#outbound_peer_discovery_attempt_time = 5
## Mainnet miners P2P network settings
[network_config."mainnet".miners_net]
# P2P accept addresses the instance listens on for inbound connections
# You can also use an IPv6 address
inbound = ["tcp+tls://0.0.0.0:8441"]
# IPv6 version:
#inbound = ["tcp+tls://[::]:8441"]
# Combined:
#inbound = ["tcp+tls://0.0.0.0:8441", "tcp+tls://[::]:8441"]
# P2P external addresses the instance advertises so other peers can
# reach us and connect to us, as long as inbound addrs are configured.
# You can also use an IPv6 address
#external_addrs = ["tcp+tls://XXX.XXX.XXX.XXX:8441"]
# IPv6 version:
#external_addrs = ["tcp+tls://[ipv6 address here]:8441"]
# Combined:
#external_addrs = ["tcp+tls://XXX.XXX.XXX.XXX:8441", "tcp+tls://[ipv6 address here]:8441"]
# Peer nodes to manually connect to
#peers = []
# Seed nodes to connect to for peer discovery and/or adversising our
# own external addresses
seeds = ["tcp+tls://lilith0.dark.fi:8441", "tcp+tls://lilith1.dark.fi:8441"]
# Whitelisted network transports for outbound connections
allowed_transports = ["tcp+tls"]
# Allow transport mixing (e.g. Tor would be allowed to connect to `tcp://`)
#transport_mixing = true
# Outbound connection slots number, this many connections will be
# attempted. (This does not include manual connections)
#outbound_connections = 8
# Manual connections retry limit, 0 for forever looping
#manual_attempt_limit = 0
# Outbound connection timeout (in seconds)
#outbound_connect_timeout = 10
# Exchange versions (handshake) timeout (in seconds)
#channel_handshake_timeout = 4
# Ping-pong exchange execution interval (in seconds)
#channel_heartbeat_interval = 10
# Allow localnet hosts
localnet = false
# Delete a peer from hosts if they've been quarantined N times
#hosts_quarantine_limit = 50
# Cooling off time for peer discovery when unsuccessful
#outbound_peer_discovery_cooloff_time = 30
# Time between peer discovery attempts
#outbound_peer_discovery_attempt_time = 5

View File

@@ -65,7 +65,7 @@ mod proto;
/// Utility functions
mod utils;
use utils::{parse_blockchain_config, spawn_miners_p2p, spawn_sync_p2p};
use utils::{parse_blockchain_config, spawn_p2p};
const CONFIG_FILE: &str = "darkfid_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../darkfid_config.toml");
@@ -138,23 +138,19 @@ pub struct BlockchainNetwork {
/// Skip syncing process and start node right away
pub skip_sync: bool,
/// Syncing network settings
/// P2P network settings
#[structopt(flatten)]
pub sync_net: SettingsOpt,
/// Miners network settings
#[structopt(flatten)]
pub miners_net: SettingsOpt,
pub net: SettingsOpt,
}
/// Daemon structure
pub struct Darkfid {
/// Syncing P2P network pointer
sync_p2p: P2pPtr,
/// Optional miners P2P network pointer
miners_p2p: Option<P2pPtr>,
/// P2P network pointer
p2p: P2pPtr,
/// Validator(node) pointer
validator: ValidatorPtr,
/// Flag to specify node is a miner
miner: bool,
/// A map of various subscribers exporting live info from the blockchain
subscribers: HashMap<&'static str, JsonSubscriber>,
/// JSON-RPC connection tracker
@@ -165,16 +161,16 @@ pub struct Darkfid {
impl Darkfid {
pub async fn new(
sync_p2p: P2pPtr,
miners_p2p: Option<P2pPtr>,
p2p: P2pPtr,
validator: ValidatorPtr,
miner: bool,
subscribers: HashMap<&'static str, JsonSubscriber>,
rpc_client: Option<RpcChadClient>,
) -> Self {
Self {
sync_p2p,
miners_p2p,
p2p,
validator,
miner,
subscribers,
rpc_connections: Mutex::new(HashSet::new()),
rpc_client,
@@ -236,46 +232,27 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs"));
subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
// Initialize syncing P2P network
let sync_p2p = spawn_sync_p2p(
&blockchain_config.sync_net.into(),
&validator,
&subscribers,
ex.clone(),
blockchain_config.miner,
)
.await;
// Initialize P2P network
let p2p = spawn_p2p(&blockchain_config.net.into(), &validator, &subscribers, ex.clone()).await;
// Initialize miners P2P network
let (miners_p2p, rpc_client) = if blockchain_config.miner {
// Initialize JSON-RPC client to perform requests to minerd
let rpc_client = if blockchain_config.miner {
let Ok(rpc_client) =
RpcChadClient::new(blockchain_config.minerd_endpoint, ex.clone()).await
else {
error!(target: "darkfid", "Failed to initialize miner daemon rpc client, check if minerd is running");
return Err(Error::RpcClientStopped)
};
(
Some(
spawn_miners_p2p(
&blockchain_config.miners_net.into(),
&validator,
&subscribers,
ex.clone(),
sync_p2p.clone(),
)
.await,
),
Some(rpc_client),
)
Some(rpc_client)
} else {
(None, None)
None
};
// Initialize node
let darkfid = Darkfid::new(
sync_p2p.clone(),
miners_p2p.clone(),
p2p.clone(),
validator.clone(),
blockchain_config.miner,
subscribers,
rpc_client,
)
@@ -311,17 +288,8 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
ex.clone(),
);
info!(target: "darkfid", "Starting sync P2P network");
sync_p2p.clone().start().await?;
// Start miners P2P network
if blockchain_config.miner {
info!(target: "darkfid", "Starting miners P2P network");
let miners_p2p = miners_p2p.clone().unwrap();
miners_p2p.clone().start().await?;
} else {
info!(target: "darkfid", "Not starting miners P2P network");
}
info!(target: "darkfid", "Starting P2P network");
p2p.clone().start().await?;
// Sync blockchain
if !blockchain_config.skip_sync {
@@ -386,13 +354,8 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
info!(target: "darkfid", "Stopping JSON-RPC server...");
rpc_task.stop().await;
info!(target: "darkfid", "Stopping syncing P2P network...");
sync_p2p.stop().await;
if blockchain_config.miner {
info!(target: "darkfid", "Stopping miners P2P network...");
miners_p2p.unwrap().stop().await;
}
info!(target: "darkfid", "Stopping P2P network...");
p2p.stop().await;
info!(target: "darkfid", "Stopping consensus task...");
consensus_task.stop().await;

View File

@@ -46,13 +46,12 @@ impl_p2p_message!(ProposalMessage, "proposal");
pub struct ProtocolProposal {
proposal_sub: MessageSubscription<ProposalMessage>,
proposals_response_sub: MessageSubscription<ForkSyncResponse>,
jobsman: ProtocolJobsManagerPtr,
validator: ValidatorPtr,
p2p: P2pPtr,
channel: ChannelPtr,
subscriber: JsonSubscriber,
miner: bool,
sync_p2p: Option<P2pPtr>,
}
impl ProtocolProposal {
@@ -61,8 +60,6 @@ impl ProtocolProposal {
validator: ValidatorPtr,
p2p: P2pPtr,
subscriber: JsonSubscriber,
miner: bool,
sync_p2p: Option<P2pPtr>,
) -> Result<ProtocolBasePtr> {
debug!(
target: "darkfid::proto::protocol_proposal::init",
@@ -70,20 +67,18 @@ impl ProtocolProposal {
);
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<ProposalMessage>().await;
msg_subsystem.add_dispatch::<ForkSyncRequest>().await;
msg_subsystem.add_dispatch::<ForkSyncResponse>().await;
let proposal_sub = channel.subscribe_msg::<ProposalMessage>().await?;
let proposals_response_sub = channel.subscribe_msg::<ForkSyncResponse>().await?;
Ok(Arc::new(Self {
proposal_sub,
proposals_response_sub,
jobsman: ProtocolJobsManager::new("ProposalProtocol", channel.clone()),
validator,
p2p,
channel,
subscriber,
miner,
sync_p2p,
}))
}
@@ -112,23 +107,11 @@ impl ProtocolProposal {
continue
}
// Check if node is connected to the miners network
if self.miner {
debug!(
target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
"Node is connected to the miners network, skipping..."
);
continue
}
let proposal_copy = (*proposal).clone();
match self.validator.append_proposal(&proposal_copy.0).await {
Ok(()) => {
self.p2p.broadcast_with_exclude(&proposal_copy, &exclude_list).await;
if let Some(sync_p2p) = self.sync_p2p.as_ref() {
sync_p2p.broadcast_with_exclude(&proposal_copy, &exclude_list).await;
}
let enc_prop =
JsonValue::String(base64::encode(&serialize_async(&proposal_copy).await));
self.subscriber.notify(vec![enc_prop].into()).await;
@@ -152,13 +135,19 @@ impl ProtocolProposal {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Asking peer for fork sequence");
let last = self.validator.blockchain.last()?;
let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal_copy.0.hash) };
let proposals_response_sub = self.channel.subscribe_msg::<ForkSyncResponse>().await?;
self.channel.send(&request).await?;
// Node waits for response
let Ok(response) = proposals_response_sub.receive_with_timeout(COMMS_TIMEOUT).await
else {
continue
let response = match self
.proposals_response_sub
.receive_with_timeout(COMMS_TIMEOUT)
.await
{
Ok(r) => r,
Err(e) => {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "Asking peer for fork sequence failed: {}", e);
continue
}
};
// Verify and store retrieved proposals
@@ -192,9 +181,6 @@ impl ProtocolProposal {
self.validator.append_proposal(proposal).await?;
let message = ProposalMessage(proposal.clone());
self.p2p.broadcast_with_exclude(&message, &exclude_list).await;
if let Some(sync_p2p) = self.sync_p2p.as_ref() {
sync_p2p.broadcast_with_exclude(&message, &exclude_list).await;
}
// Notify subscriber
let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
self.subscriber.notify(vec![enc_prop].into()).await;

View File

@@ -107,8 +107,11 @@ impl ProtocolSync {
);
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<IsSyncedRequest>().await;
msg_subsystem.add_dispatch::<IsSyncedResponse>().await;
msg_subsystem.add_dispatch::<SyncRequest>().await;
msg_subsystem.add_dispatch::<SyncResponse>().await;
msg_subsystem.add_dispatch::<ForkSyncRequest>().await;
msg_subsystem.add_dispatch::<ForkSyncResponse>().await;
let is_synced_sub = channel.subscribe_msg::<IsSyncedRequest>().await?;
let request_sub = channel.subscribe_msg::<SyncRequest>().await?;

View File

@@ -48,31 +48,30 @@ impl RequestHandler for Darkfid {
// =====================
// Miscellaneous methods
// =====================
"ping" => return self.pong(req.id, req.params).await,
"clock" => return self.clock(req.id, req.params).await,
"sync_dnet_switch" => return self.sync_dnet_switch(req.id, req.params).await,
"miners_dnet_switch" => return self.miners_dnet_switch(req.id, req.params).await,
"ping_miner" => return self.ping_miner(req.id, req.params).await,
"ping" => self.pong(req.id, req.params).await,
"clock" => self.clock(req.id, req.params).await,
"dnet_switch" => self.dnet_switch(req.id, req.params).await,
"ping_miner" => self.ping_miner(req.id, req.params).await,
// ==================
// Blockchain methods
// ==================
"blockchain.get_block" => return self.blockchain_get_block(req.id, req.params).await,
"blockchain.get_tx" => return self.blockchain_get_tx(req.id, req.params).await,
"blockchain.last_known_block" => return self.blockchain_last_known_block(req.id, req.params).await,
"blockchain.lookup_zkas" => return self.blockchain_lookup_zkas(req.id, req.params).await,
"blockchain.subscribe_blocks" => return self.blockchain_subscribe_blocks(req.id, req.params).await,
"blockchain.subscribe_txs" => return self.blockchain_subscribe_txs(req.id, req.params).await,
"blockchain.subscribe_proposals" => return self.blockchain_subscribe_proposals(req.id, req.params).await,
"merge_mining_get_chain_id" => return self.merge_mining_get_chain_id(req.id, req.params).await,
"blockchain.get_block" => self.blockchain_get_block(req.id, req.params).await,
"blockchain.get_tx" => self.blockchain_get_tx(req.id, req.params).await,
"blockchain.last_known_block" => self.blockchain_last_known_block(req.id, req.params).await,
"blockchain.lookup_zkas" => self.blockchain_lookup_zkas(req.id, req.params).await,
"blockchain.subscribe_blocks" => self.blockchain_subscribe_blocks(req.id, req.params).await,
"blockchain.subscribe_txs" => self.blockchain_subscribe_txs(req.id, req.params).await,
"blockchain.subscribe_proposals" => self.blockchain_subscribe_proposals(req.id, req.params).await,
"merge_mining_get_chain_id" => self.merge_mining_get_chain_id(req.id, req.params).await,
// ===================
// Transaction methods
// ===================
"tx.simulate" => return self.tx_simulate(req.id, req.params).await,
"tx.broadcast" => return self.tx_broadcast(req.id, req.params).await,
"tx.pending" => return self.tx_pending(req.id, req.params).await,
"tx.clean_pending" => return self.tx_pending(req.id, req.params).await,
"tx.simulate" => self.tx_simulate(req.id, req.params).await,
"tx.broadcast" => self.tx_broadcast(req.id, req.params).await,
"tx.pending" => self.tx_pending(req.id, req.params).await,
"tx.clean_pending" => self.tx_pending(req.id, req.params).await,
// ==============
// Invalid method
@@ -98,13 +97,13 @@ impl Darkfid {
}
// RPCAPI:
// Activate or deactivate dnet in the sync P2P stack.
// Activate or deactivate dnet in the P2P stack.
// By sending `true`, dnet will be activated, and by sending `false` dnet
// will be deactivated. Returns `true` on success.
//
// --> {"jsonrpc": "2.0", "method": "sync_dnet_switch", "params": [true], "id": 42}
// --> {"jsonrpc": "2.0", "method": "dnet_switch", "params": [true], "id": 42}
// <-- {"jsonrpc": "2.0", "result": true, "id": 42}
async fn sync_dnet_switch(&self, id: u16, params: JsonValue) -> JsonResult {
async fn dnet_switch(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if params.len() != 1 || !params[0].is_bool() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
@@ -113,34 +112,9 @@ impl Darkfid {
let switch = params[0].get::<bool>().unwrap();
if *switch {
self.sync_p2p.dnet_enable().await;
self.p2p.dnet_enable().await;
} else {
self.sync_p2p.dnet_disable().await;
}
JsonResponse::new(JsonValue::Boolean(true), id).into()
}
// RPCAPI:
// Activate or deactivate dnet in the miners P2P stack.
// By sending `true`, dnet will be activated, and by sending `false` dnet
// will be deactivated. Returns `true` on success.
//
// --> {"jsonrpc": "2.0", "method": "miners_dnet_switch", "params": [true], "id": 42}
// <-- {"jsonrpc": "2.0", "result": true, "id": 42}
async fn miners_dnet_switch(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if params.len() != 1 || !params[0].is_bool() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
if self.miners_p2p.is_some() {
let switch = params[0].get::<bool>().unwrap();
if *switch {
self.miners_p2p.as_ref().unwrap().dnet_enable().await;
} else {
self.miners_p2p.as_ref().unwrap().dnet_disable().await;
}
self.p2p.dnet_disable().await;
}
JsonResponse::new(JsonValue::Boolean(true), id).into()

View File

@@ -119,28 +119,22 @@ impl Darkfid {
}
};
if self.miners_p2p.is_some() {
// Block production participants can directly perform
// the state transition check and append to their
// pending transactions store.
if self.validator.append_tx(&tx, true).await.is_err() {
error!(target: "darkfid::rpc::tx_broadcast", "Failed to append transaction to mempool");
return server_error(RpcError::TxSimulationFail, id, None)
}
// Block production participants can directly perform
// the state transition check and append to their
// pending transactions store.
let error_message = if self.miner {
"Failed to append transaction to mempool"
} else {
// We'll perform the state transition check here.
let result = self.validator.append_tx(&tx, false).await;
if result.is_err() {
error!(
target: "darkfid::rpc::tx_broadcast", "Failed to validate state transition: {}",
result.err().unwrap()
);
return server_error(RpcError::TxSimulationFail, id, None)
};
}
"Failed to validate state transition"
};
// We'll perform the state transition check here.
if let Err(e) = self.validator.append_tx(&tx, self.miner).await {
error!(target: "darkfid::rpc::tx_broadcast", "{}: {}", error_message, e);
return server_error(RpcError::TxSimulationFail, id, None)
};
self.sync_p2p.broadcast(&tx).await;
if self.sync_p2p.hosts().channels().await.is_empty() {
self.p2p.broadcast(&tx).await;
if self.p2p.hosts().channels().await.is_empty() {
error!(target: "darkfid::rpc::tx_broadcast", "Failed broadcasting tx, no connected channels");
return server_error(RpcError::TxBroadcastFail, id, None)
}

View File

@@ -195,8 +195,7 @@ async fn mine_next_block(
// Broadcast proposal to the network
let message = ProposalMessage(proposal);
node.miners_p2p.as_ref().unwrap().broadcast(&message).await;
node.sync_p2p.broadcast(&message).await;
node.p2p.broadcast(&message).await;
Ok(())
}

View File

@@ -36,15 +36,13 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> {
let mut peers = vec![];
loop {
// Grab channels
let channels = node.sync_p2p.hosts().channels().await;
let channels = node.p2p.hosts().channels().await;
// Check anyone is connected
if !channels.is_empty() {
// Ask each peer if they are synced
for channel in channels {
// Communication setup
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<IsSyncedResponse>().await;
let response_sub = channel.subscribe_msg::<IsSyncedResponse>().await?;
// Node creates a `IsSyncedRequest` and sends it
@@ -76,9 +74,6 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> {
let channel = &peers[0];
// Communication setup
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<SyncResponse>().await;
msg_subsystem.add_dispatch::<ForkSyncResponse>().await;
let block_response_sub = channel.subscribe_msg::<SyncResponse>().await?;
let proposals_response_sub = channel.subscribe_msg::<ForkSyncResponse>().await?;
let notif_sub = node.subscribers.get("blocks").unwrap();

View File

@@ -41,12 +41,7 @@ use darkfi_serial::Encodable;
use num_bigint::BigUint;
use url::Url;
use crate::{
proto::ProposalMessage,
task::sync::sync_task,
utils::{spawn_miners_p2p, spawn_sync_p2p},
Darkfid,
};
use crate::{proto::ProposalMessage, task::sync::sync_task, utils::spawn_p2p, Darkfid};
pub struct HarnessConfig {
pub pow_target: usize,
@@ -97,42 +92,19 @@ impl Harness {
// Generate validators using pregenerated vks
let (_, vks) = vks::get_cached_pks_and_vks()?;
let mut sync_settings =
Settings { localnet: true, inbound_connections: 3, ..Default::default() };
let mut miners_settings =
let mut settings =
Settings { localnet: true, inbound_connections: 3, ..Default::default() };
// Alice
let alice_url = Url::parse("tcp+tls://127.0.0.1:18340")?;
sync_settings.inbound_addrs = vec![alice_url.clone()];
let alice_miners_url = Url::parse("tcp+tls://127.0.0.1:18350")?;
miners_settings.inbound_addrs = vec![alice_miners_url.clone()];
let alice = generate_node(
&vks,
&validator_config,
&sync_settings,
Some(&miners_settings),
ex,
true,
)
.await?;
settings.inbound_addrs = vec![alice_url.clone()];
let alice = generate_node(&vks, &validator_config, &settings, ex, true, true).await?;
// Bob
let bob_url = Url::parse("tcp+tls://127.0.0.1:18341")?;
sync_settings.inbound_addrs = vec![bob_url];
sync_settings.peers = vec![alice_url];
let bob_miners_url = Url::parse("tcp+tls://127.0.0.1:18351")?;
miners_settings.inbound_addrs = vec![bob_miners_url];
miners_settings.peers = vec![alice_miners_url];
let bob = generate_node(
&vks,
&validator_config,
&sync_settings,
Some(&miners_settings),
ex,
false,
)
.await?;
settings.inbound_addrs = vec![bob_url];
settings.peers = vec![alice_url];
let bob = generate_node(&vks, &validator_config, &settings, ex, true, false).await?;
Ok(Self { config, vks, validator_config, alice, bob })
}
@@ -179,7 +151,7 @@ impl Harness {
let proposal = Proposal::new(block.clone())?;
self.alice.validator.append_proposal(&proposal).await?;
let message = ProposalMessage(proposal);
self.alice.miners_p2p.as_ref().unwrap().broadcast(&message).await;
self.alice.p2p.broadcast(&message).await;
}
// Sleep a bit so blocks can be propagated and then
@@ -258,9 +230,9 @@ impl Harness {
pub async fn generate_node(
vks: &Vec<(Vec<u8>, String, Vec<u8>)>,
config: &ValidatorConfig,
sync_settings: &Settings,
miners_settings: Option<&Settings>,
settings: &Settings,
ex: &Arc<smol::Executor<'static>>,
miner: bool,
skip_sync: bool,
) -> Result<Darkfid> {
let sled_db = sled::Config::new().temporary(true).open()?;
@@ -273,28 +245,10 @@ pub async fn generate_node(
subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs"));
subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
let (sync_p2p, miners_p2p) = if let Some(settings) = miners_settings {
let sync_p2p =
spawn_sync_p2p(sync_settings, &validator, &subscribers, ex.clone(), true).await;
let miners_p2p = Some(
spawn_miners_p2p(settings, &validator, &subscribers, ex.clone(), sync_p2p.clone())
.await,
);
(sync_p2p, miners_p2p)
} else {
let sync_p2p =
spawn_sync_p2p(sync_settings, &validator, &subscribers, ex.clone(), false).await;
(sync_p2p, None)
};
let node =
Darkfid::new(sync_p2p.clone(), miners_p2p.clone(), validator, subscribers, None).await;
let p2p = spawn_p2p(settings, &validator, &subscribers, ex.clone()).await;
let node = Darkfid::new(p2p.clone(), validator, miner, subscribers, None).await;
sync_p2p.clone().start().await?;
if miners_settings.is_some() {
let miners_p2p = miners_p2p.unwrap();
miners_p2p.clone().start().await?;
}
p2p.start().await?;
if !skip_sync {
sync_task(&node).await?;

View File

@@ -70,15 +70,14 @@ async fn sync_blocks_real(ex: Arc<Executor<'static>>) -> Result<()> {
th.validate_fork_chains(2, vec![2, 1]).await;
// We are going to create a third node and try to sync from Bob
let mut sync_settings =
Settings { localnet: true, inbound_connections: 3, ..Default::default() };
let mut settings = Settings { localnet: true, inbound_connections: 3, ..Default::default() };
let charlie_url = Url::parse("tcp+tls://127.0.0.1:18342")?;
sync_settings.inbound_addrs = vec![charlie_url];
let bob_url = th.bob.sync_p2p.settings().inbound_addrs[0].clone();
sync_settings.peers = vec![bob_url];
settings.inbound_addrs = vec![charlie_url];
let bob_url = th.bob.p2p.settings().inbound_addrs[0].clone();
settings.peers = vec![bob_url];
let charlie =
generate_node(&th.vks, &th.validator_config, &sync_settings, None, &ex, false).await?;
generate_node(&th.vks, &th.validator_config, &settings, &ex, false, false).await?;
// Verify node synced
let alice = &th.alice.validator;
let charlie = &charlie.validator;

View File

@@ -43,35 +43,34 @@ async fn sync_forks_real(ex: Arc<Executor<'static>>) -> Result<()> {
let th = Harness::new(config, true, &ex).await?;
// Retrieve genesis block and generate 3 forks
let previous = th.alice.validator.blockchain.last_block()?;
let genesis = th.alice.validator.blockchain.last_block()?;
// Generate a fork with 3 blocks
let block1 = th.generate_next_block(&previous).await?;
let block1 = th.generate_next_block(&genesis).await?;
let block2 = th.generate_next_block(&block1).await?;
let block3 = th.generate_next_block(&block2).await?;
th.add_blocks(&vec![block1, block2, block3]).await?;
// Generate a fork with 1 block
let block4 = th.generate_next_block(&previous).await?;
let block4 = th.generate_next_block(&genesis).await?;
th.add_blocks(&vec![block4.clone()]).await?;
// Generate a fork with 1 block
let block5 = th.generate_next_block(&previous).await?;
let block5 = th.generate_next_block(&genesis).await?;
th.add_blocks(&vec![block5.clone()]).await?;
// Check nodes have all the forks
th.validate_fork_chains(3, vec![3, 1, 1]).await;
// We are going to create a third node and try to sync from Bob
let mut sync_settings =
Settings { localnet: true, inbound_connections: 3, ..Default::default() };
let mut settings = Settings { localnet: true, inbound_connections: 3, ..Default::default() };
let charlie_url = Url::parse("tcp+tls://127.0.0.1:18342")?;
sync_settings.inbound_addrs = vec![charlie_url];
let bob_url = th.bob.sync_p2p.settings().inbound_addrs[0].clone();
sync_settings.peers = vec![bob_url];
settings.inbound_addrs = vec![charlie_url];
let bob_url = th.bob.p2p.settings().inbound_addrs[0].clone();
settings.peers = vec![bob_url];
let charlie =
generate_node(&th.vks, &th.validator_config, &sync_settings, None, &ex, false).await?;
generate_node(&th.vks, &th.validator_config, &settings, &ex, false, false).await?;
// Verify node synced the big(best) fork
let charlie_forks = charlie.validator.consensus.forks.read().await;

View File

@@ -35,32 +35,17 @@ use crate::{
BlockchainNetwork, CONFIG_FILE,
};
/// Auxiliary function to generate the sync P2P network and register all its protocols.
pub async fn spawn_sync_p2p(
/// Auxiliary function to generate the P2P network and register all its protocols.
pub async fn spawn_p2p(
settings: &Settings,
validator: &ValidatorPtr,
subscribers: &HashMap<&'static str, JsonSubscriber>,
executor: Arc<Executor<'static>>,
miner: bool,
) -> P2pPtr {
info!(target: "darkfid", "Registering sync network P2P protocols...");
let p2p = P2p::new(settings.clone(), executor.clone()).await;
let registry = p2p.protocol_registry();
let _validator = validator.clone();
let _subscriber = subscribers.get("proposals").unwrap().clone();
registry
.register(SESSION_ALL, move |channel, p2p| {
let validator = _validator.clone();
let subscriber = _subscriber.clone();
async move {
ProtocolProposal::init(channel, validator, p2p, subscriber, miner, None)
.await
.unwrap()
}
})
.await;
let _validator = validator.clone();
registry
.register(SESSION_ALL, move |channel, _p2p| {
@@ -69,6 +54,20 @@ pub async fn spawn_sync_p2p(
})
.await;
let _validator = validator.clone();
let _subscriber = subscribers.get("proposals").unwrap().clone();
registry
.register(SESSION_ALL, move |channel, p2p| {
let validator = _validator.clone();
let subscriber = _subscriber.clone();
async move {
ProtocolProposal::init(channel, validator, p2p, subscriber)
.await
.unwrap()
}
})
.await;
let _validator = validator.clone();
let _subscriber = subscribers.get("txs").unwrap().clone();
registry
@@ -82,37 +81,6 @@ pub async fn spawn_sync_p2p(
p2p
}
/// Auxiliary function to generate the miners P2P network and register all its protocols.
pub async fn spawn_miners_p2p(
settings: &Settings,
validator: &ValidatorPtr,
subscribers: &HashMap<&'static str, JsonSubscriber>,
executor: Arc<Executor<'static>>,
sync_p2p: P2pPtr,
) -> P2pPtr {
info!(target: "darkfid", "Registering miners network P2P protocols...");
let p2p = P2p::new(settings.clone(), executor.clone()).await;
let registry = p2p.protocol_registry();
let _validator = validator.clone();
let _subscriber = subscribers.get("proposals").unwrap().clone();
let _sync_p2p = Some(sync_p2p);
registry
.register(SESSION_ALL, move |channel, p2p| {
let validator = _validator.clone();
let subscriber = _subscriber.clone();
let sync_p2p = _sync_p2p.clone();
async move {
ProtocolProposal::init(channel, validator, p2p, subscriber, false, sync_p2p)
.await
.unwrap()
}
})
.await;
p2p
}
/// Auxiliary function to parse darkfid configuration file and extract requested
/// blockchain network config.
pub async fn parse_blockchain_config(

View File

@@ -37,8 +37,8 @@ recipient = "9vw6WznKk7xEFQwwXhJWMMdjUPi3cXL8NrFKQpKifG1U"
# Skip syncing process and start node right away
skip_sync = true
## Localnet sync P2P network settings
[network_config."localnet".sync_net]
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48242"]
@@ -47,14 +47,3 @@ allowed_transports = ["tcp+tls"]
# Allow localnet hosts
localnet = true
## Localnet miners P2P network settings
[network_config."localnet".miners_net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48241"]
# Whitelisted network transports for outbound connections
allowed_transports = ["tcp+tls"]
# Allow localnet hosts
localnet = true

View File

@@ -37,8 +37,8 @@ recipient = "9vw6WznKk7xEFQwwXhJWMMdjUPi3cXL8NrFKQpKifG1U"
# Skip syncing process and start node right away
skip_sync = false
## Localnet sync P2P network settings
[network_config."localnet".sync_net]
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48342"]
@@ -50,17 +50,3 @@ peers = ["tcp+tls://0.0.0.0:48242"]
# Allow localnet hosts
localnet = true
## Localnet miners P2P network settings
[network_config."localnet".miners_net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48341"]
# Whitelisted network transports for outbound connections
allowed_transports = ["tcp+tls"]
# Peer nodes to manually connect to
peers = ["tcp+tls://0.0.0.0:48241"]
# Allow localnet hosts
localnet = true

View File

@@ -37,8 +37,8 @@ recipient = "9vw6WznKk7xEFQwwXhJWMMdjUPi3cXL8NrFKQpKifG1U"
# Skip syncing process and start node right away
skip_sync = false
## Localnet sync P2P network settings
[network_config."localnet".sync_net]
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48442"]
@@ -50,17 +50,3 @@ peers = ["tcp+tls://0.0.0.0:48242", "tcp+tls://0.0.0.0:48342"]
# Allow localnet hosts
localnet = true
## Localnet miners P2P network settings
[network_config."localnet".miners_net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48441"]
# Whitelisted network transports for outbound connections
allowed_transports = ["tcp+tls"]
# Peer nodes to manually connect to
peers = ["tcp+tls://0.0.0.0:48241", "tcp+tls://0.0.0.0:48341"]
# Allow localnet hosts
localnet = true

View File

@@ -37,8 +37,8 @@ recipient = "9vw6WznKk7xEFQwwXhJWMMdjUPi3cXL8NrFKQpKifG1U"
# Skip syncing process and start node right away
skip_sync = false
## Localnet sync P2P network settings
[network_config."localnet".sync_net]
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48542"]
@@ -50,17 +50,3 @@ peers = ["tcp+tls://0.0.0.0:48242", "tcp+tls://0.0.0.0:48342", "tcp+tls://0.0.0.
# Allow localnet hosts
localnet = true
## Localnet miners P2P network settings
[network_config."localnet".miners_net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48541"]
# Whitelisted network transports for outbound connections
allowed_transports = ["tcp+tls"]
# Peer nodes to manually connect to
peers = ["tcp+tls://0.0.0.0:48241", "tcp+tls://0.0.0.0:48341", "tcp+tls://0.0.0.0:48441"]
# Allow localnet hosts
localnet = true

View File

@@ -37,8 +37,8 @@ recipient = "9vw6WznKk7xEFQwwXhJWMMdjUPi3cXL8NrFKQpKifG1U"
# Skip syncing process and start node right away
skip_sync = false
## Localnet sync P2P network settings
[network_config."localnet".sync_net]
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48642"]
@@ -50,17 +50,3 @@ peers = ["tcp+tls://0.0.0.0:48242", "tcp+tls://0.0.0.0:48342", "tcp+tls://0.0.0.
# Allow localnet hosts
localnet = true
## Localnet miners P2P network settings
[network_config."localnet".miners_net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48641"]
# Whitelisted network transports for outbound connections
allowed_transports = ["tcp+tls"]
# Peer nodes to manually connect to
peers = ["tcp+tls://0.0.0.0:48241", "tcp+tls://0.0.0.0:48341", "tcp+tls://0.0.0.0:48441", "tcp+tls://0.0.0.0:48541"]
# Allow localnet hosts
localnet = true

View File

@@ -40,18 +40,10 @@ recipient = "9vw6WznKk7xEFQwwXhJWMMdjUPi3cXL8NrFKQpKifG1U"
# Skip syncing process and start node right away
skip_sync = true
## Localnet sync P2P network settings
[network_config."localnet".sync_net]
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48242"]
# Allow localnet hosts
localnet = true
## Localnet miners P2P network settings
[network_config."localnet".miners_net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48241"]
# Allow localnet hosts
localnet = true

View File

@@ -37,8 +37,8 @@ recipient = "9vw6WznKk7xEFQwwXhJWMMdjUPi3cXL8NrFKQpKifG1U"
# Skip syncing process and start node right away
skip_sync = true
## Localnet sync P2P network settings
[network_config."localnet".sync_net]
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48242"]
@@ -47,14 +47,3 @@ allowed_transports = ["tcp+tls"]
# Allow localnet hosts
localnet = true
## Localnet miners P2P network settings
[network_config."localnet".miners_net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48241"]
# Whitelisted network transports for outbound connections
allowed_transports = ["tcp+tls"]
# Allow localnet hosts
localnet = true

View File

@@ -45,8 +45,8 @@ recipient = "9vw6WznKk7xEFQwwXhJWMMdjUPi3cXL8NrFKQpKifG1U"
# Skip syncing process and start node right away
skip_sync = false
## Localnet sync P2P network settings
[network_config."localnet".sync_net]
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48342"]
@@ -58,17 +58,3 @@ peers = ["tcp+tls://0.0.0.0:48242"]
# Allow localnet hosts
localnet = true
## Localnet miners P2P network settings
[network_config."localnet".miners_net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48341"]
# Whitelisted network transports for outbound connections
allowed_transports = ["tcp+tls"]
# Peer nodes to manually connect to
peers = ["tcp+tls://0.0.0.0:48241"]
# Allow localnet hosts
localnet = true

View File

@@ -32,8 +32,8 @@ miner = false
# Skip syncing process and start node right away
skip_sync = false
## Localnet sync P2P network settings
[network_config."localnet".sync_net]
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48442"]
@@ -45,17 +45,3 @@ peers = ["tcp+tls://0.0.0.0:48242", "tcp+tls://0.0.0.0:48342"]
# Allow localnet hosts
localnet = true
## Localnet miners P2P network settings
[network_config."localnet".miners_net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:48441"]
# Whitelisted network transports for outbound connections
allowed_transports = ["tcp+tls"]
# Peer nodes to manually connect to
peers = ["tcp+tls://0.0.0.0:48241", "tcp+tls://0.0.0.0:48341"]
# Allow localnet hosts
localnet = true