repo: updated everything to merged stuff

This commit is contained in:
aggstam
2023-06-29 16:13:43 +03:00
parent b4d28da805
commit 1ea8adfb28
16 changed files with 41 additions and 70 deletions

View File

@@ -22,8 +22,9 @@ use async_std::sync::Arc;
use async_trait::async_trait;
use darkfi::{
dht2::net_hashmap::{NetHashMapInsert, NetHashMapRemove},
impl_p2p_message,
net::{
self, ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
Result,
@@ -52,11 +53,7 @@ pub struct ChunkRequest {
pub hash: blake3::Hash,
}
impl net::Message for ChunkRequest {
fn name() -> &'static str {
"dhtchunkrequest"
}
}
impl_p2p_message!(ChunkRequest, "dhtchunkrequest");
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct ChunkReply {
@@ -64,22 +61,14 @@ pub struct ChunkReply {
pub data: Vec<u8>,
}
impl net::Message for ChunkReply {
fn name() -> &'static str {
"dhtchunkreply"
}
}
impl_p2p_message!(ChunkReply, "dhtchunkreply");
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FileRequest {
pub hash: blake3::Hash,
}
impl net::Message for FileRequest {
fn name() -> &'static str {
"dhtfilerequest"
}
}
impl_p2p_message!(FileRequest, "dhtfilerequest");
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FileReply {
@@ -87,16 +76,12 @@ pub struct FileReply {
pub chunks: Vec<blake3::Hash>,
}
impl net::Message for FileReply {
fn name() -> &'static str {
"dhtfilereply"
}
}
impl_p2p_message!(FileReply, "dhtfilereply");
impl ProtocolDht {
#[allow(dead_code)]
pub async fn init(channel: ChannelPtr, p2p: P2pPtr, state: DhtdPtr) -> Result<ProtocolBasePtr> {
let msg_subsystem = channel.get_message_subsystem();
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<NetHashMapInsert<blake3::Hash, Vec<blake3::Hash>>>().await;
msg_subsystem.add_dispatch::<NetHashMapRemove<blake3::Hash>>().await;
msg_subsystem.add_dispatch::<ChunkRequest>().await;
@@ -137,7 +122,7 @@ impl ProtocolDht {
state.routing_table.entry(msg.k).or_insert_with(HashSet::new);
let hashset = state.routing_table.get_mut(&msg.k).unwrap();
hashset.insert(self.channel.address());
hashset.insert(self.channel.address().clone());
}
}

View File

@@ -25,7 +25,7 @@ use async_std::{
};
use darkfi::{
dht2::{Dht, MAX_CHUNK_SIZE},
net::{self, transport::TransportName, P2p},
net::{self, P2p},
util::async_util::{msleep, sleep},
Result,
};
@@ -52,9 +52,9 @@ async fn dht_remote_get_insert_real(ex: Arc<Executor<'_>>) -> Result<()> {
drop(listener);
let settings = net::Settings {
inbound: vec![url.clone()],
inbound_addrs: vec![url.clone()],
peers: addrs.clone(),
outbound_transports: vec![TransportName::Tcp(None)],
allowed_transports: vec!["tcp".into()],
localnet: true,
..Default::default()
};
@@ -88,7 +88,6 @@ async fn dht_remote_get_insert_real(ex: Arc<Executor<'_>>) -> Result<()> {
dhtds.push(dhtd);
p2p.wait_for_outbound(ex.clone()).await?;
sleep(1).await;
}

View File

@@ -134,7 +134,7 @@ async fn main() -> Result<()> {
let args = Args::parse();
let log_level = get_log_level(args.verbose.into());
let log_config = get_log_config();
let log_config = get_log_config(args.verbose.into());
TermLogger::init(log_level, log_config, TerminalMode::Mixed, ColorChoice::Auto)?;
let rpc_client = RpcClient::new(args.endpoint).await?;

View File

@@ -73,7 +73,7 @@ struct Args {
#[structopt(long, default_value = "8")]
/// Connection slots
slots: u32,
slots: usize,
#[structopt(long)]
/// Connect to seed (repeatable flag)
@@ -397,14 +397,13 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'_>>) -> Result<()> {
// P2P network
let network_settings = net::Settings {
inbound: args.p2p_accept,
inbound_addrs: args.p2p_accept,
outbound_connections: args.slots,
external_addr: args.p2p_external,
external_addrs: args.p2p_external,
peers: args.peers.clone(),
seeds: args.seeds.clone(),
outbound_transports: net::settings::get_outbound_transports(args.transports),
allowed_transports: args.transports,
localnet: args.localnet,
channel_log: args.channel_log,
..Default::default()
};
@@ -434,9 +433,6 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'_>>) -> Result<()> {
})
.detach();
info!("Waiting for P2P outbound connections");
p2p.wait_for_outbound(ex).await?;
fud.init().await?;
// Wait for SIGINT

View File

@@ -67,7 +67,7 @@ async fn main() -> Result<()> {
let args = Args::parse();
let log_level = get_log_level(args.verbose.into());
let log_config = get_log_config();
let log_config = get_log_config(args.verbose.into());
TermLogger::init(log_level, log_config, TerminalMode::Mixed, ColorChoice::Auto)?;
let rpc_client = RpcClient::new(args.endpoint).await?;

View File

@@ -99,8 +99,7 @@ async fn realmain(args: Args, executor: Arc<smol::Executor<'_>>) -> Result<()> {
let seen_inv = Seen::new();
// Check the version
let mut net_settings = args.net.clone();
net_settings.app_version = Some(option_env!("CARGO_PKG_VERSION").unwrap_or("").to_string());
let net_settings = args.net.clone();
// New p2p
let p2p = net::P2p::new(net_settings.into()).await;

View File

@@ -111,7 +111,7 @@ impl JsonRpcInterface {
return JsonResponse::new(json, id).into()
}
self.p2p.broadcast(event).await.unwrap();
self.p2p.broadcast(&event).await;
let json = json!(true);
JsonResponse::new(json, id).into()

View File

@@ -165,7 +165,7 @@ async fn main() -> Result<()> {
let args = Args::parse();
let log_level = get_log_level(args.verbose.into());
let log_config = get_log_config();
let log_config = get_log_config(args.verbose.into());
TermLogger::init(log_level, log_config, TerminalMode::Mixed, ColorChoice::Auto)?;
let rpc_client = RpcClient::new(args.endpoint).await?;

View File

@@ -166,7 +166,7 @@ async fn start_sync_loop(
timestamp: Timestamp::current_time(),
};
p2p.broadcast(event).await?;
p2p.broadcast(&event).await;
}
}
@@ -331,8 +331,7 @@ async fn realmain(settings: Args, executor: Arc<smol::Executor<'_>>) -> Result<(
//
// P2p setup
//
let mut net_settings = settings.net.clone();
net_settings.app_version = Some(option_env!("CARGO_PKG_VERSION").unwrap_or("").to_string());
let net_settings = settings.net.clone();
let p2p = net::P2p::new(net_settings.into()).await;
let registry = p2p.protocol_registry();

View File

@@ -19,19 +19,15 @@
// ANCHOR: msg
use async_std::sync::{Arc, Mutex};
use darkfi::net;
use darkfi::{impl_p2p_message, net::Message};
use darkfi_serial::{SerialDecodable, SerialEncodable};
pub type DchatMsgsBuffer = Arc<Mutex<Vec<DchatMsg>>>;
impl net::Message for DchatMsg {
fn name() -> &'static str {
"DchatMsg"
}
}
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct DchatMsg {
pub msg: String,
}
impl_p2p_message!(DchatMsg, "DchatMsg");
// ANCHOR_END: msg

View File

@@ -113,7 +113,7 @@ impl Dchat {
debug!(target: "dchat", "Dchat::register_protocol() [START]");
let registry = self.p2p.protocol_registry();
registry
.register(!net::SESSION_SEED, move |channel, _p2p| {
.register(!net::session::SESSION_SEED, move |channel, _p2p| {
let msgs2 = msgs.clone();
async move { ProtocolDchat::init(channel, msgs2).await }
})
@@ -145,7 +145,7 @@ impl Dchat {
// ANCHOR: send
async fn send(&self, msg: String) -> Result<()> {
let dchatmsg = DchatMsg { msg };
self.p2p.broadcast(dchatmsg).await?;
self.p2p.broadcast(&dchatmsg).await;
Ok(())
}
// ANCHOR_END: send
@@ -179,8 +179,8 @@ fn alice() -> Result<AppSettings> {
let ext_addr = Url::parse("tcp://127.0.0.1:51554").unwrap();
let net = Settings {
inbound: vec![inbound],
external_addr: vec![ext_addr],
inbound_addrs: vec![inbound],
external_addrs: vec![ext_addr],
seeds: vec![seed],
localnet: true,
..Default::default()
@@ -205,7 +205,7 @@ fn bob() -> Result<AppSettings> {
let seed = Url::parse("tcp://127.0.0.1:50515").unwrap();
let net = Settings {
inbound: vec![],
inbound_addrs: vec![],
outbound_connections: 5,
seeds: vec![seed],
localnet: true,

View File

@@ -36,7 +36,7 @@ pub struct ProtocolDchat {
impl ProtocolDchat {
pub async fn init(channel: net::ChannelPtr, msgs: DchatMsgsBuffer) -> net::ProtocolBasePtr {
debug!(target: "dchat", "ProtocolDchat::init() [START]");
let message_subsytem = channel.get_message_subsystem();
let message_subsytem = channel.message_subsystem();
message_subsytem.add_dispatch::<DchatMsg>().await;
let msg_sub =

View File

@@ -55,7 +55,7 @@ pub struct DarkCli {
pub connect: Option<Vec<String>>,
/// connections slots
#[clap(long)]
pub connect_slots: Option<u32>,
pub connect_slots: Option<usize>,
/// RPC port
#[clap(long)]
pub rpc_port: Option<String>,
@@ -93,9 +93,9 @@ impl ProgramOptions {
Ok(ProgramOptions {
network_settings: net::Settings {
inbound: accept_addr.clone(),
inbound_addrs: accept_addr.clone(),
outbound_connections: connection_slots,
external_addr: accept_addr,
external_addrs: accept_addr,
peers: manual_connects,
seeds: seed_addrs,
..Default::default()
@@ -108,7 +108,7 @@ fn main() -> Result<()> {
let options = ProgramOptions::load()?;
let lvl = get_log_level(1);
let conf = get_log_config();
let conf = get_log_config(1);
TermLogger::init(lvl, conf, TerminalMode::Mixed, ColorChoice::Auto)?;

View File

@@ -142,10 +142,7 @@ impl Dht {
Some(_) => {
debug!(target: "dht", "Key removed: {}", key);
let request = LookupRequest::new(self.id, key, 1);
if let Err(e) = self.p2p.broadcast(&request).await {
error!(target: "dht", "Failed broadcasting request: {}", e);
return Err(e)
}
self.p2p.broadcast(&request).await;
self.lookup_remove(key, self.id)
}
@@ -225,10 +222,7 @@ impl Dht {
let peer = *peers.iter().last().unwrap();
let request = KeyRequest::new(self.id, peer, key);
// TODO: ask connected peers directly, not broadcast
if let Err(e) = self.p2p.broadcast(request).await {
error!(target: "dht", "Failed broadcasting request: {}", e);
return Err(e)
}
self.p2p.broadcast(&request).await;
Ok(())
}

View File

@@ -55,7 +55,7 @@ impl Protocol {
p2p: P2pPtr,
) -> Result<ProtocolBasePtr> {
debug!(target: "dht::protocol", "Adding Protocol to the protocol registry");
let msg_subsystem = channel.get_message_subsystem();
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<KeyRequest>().await;
msg_subsystem.add_dispatch::<KeyResponse>().await;
msg_subsystem.add_dispatch::<LookupRequest>().await;

View File

@@ -30,6 +30,9 @@ pub mod consensus;
#[cfg(feature = "blockchain")]
pub mod validator;
#[cfg(feature = "dht")]
pub mod dht;
#[cfg(feature = "dht")]
pub mod dht2;