feat: add a TUI interface to rust-peer (#245)

* add tui ui
* add polling of events
* clean up peer code
* all hooked up
* clean up system messages
* smarter system messages
* add tcp
* fix channel handling
* drop peers when they hang up
* fix listen address is not necessarily an external address
* add better logging and peer list
* add headless, kademlia, relay, dcutr, webrtc, peer discovery
* fix clippy
* add tui ui
* add polling of events
* clean up peer code
* all hooked up
* clean up system messages
* smarter system messages
* add tcp
* fix channel handling
* drop peers when they hang up
* fix listen address is not necessarily an external address
* add better logging and peer list
* add headless, kademlia, relay, dcutr, webrtc, peer discovery
* fix clippy

Signed-off-by: Dave Grantham <dwg@linuxprogrammer.org>
This commit is contained in:
Dave Huseby
2025-05-01 20:30:37 -06:00
committed by GitHub
parent b0ef55c969
commit a1dd5f5c3b
21 changed files with 3015 additions and 699 deletions

View File

@@ -25,7 +25,7 @@ Some of the cool and cutting-edge [transport protocols](https://connectivity.lib
| [`js-peer`](./js-peer/) | Browser Chat Peer in TypeScript | ✅ | ✅ | ✅ | ❌ | ❌ |
| [`node-js-peer`](./node-js-peer/) | Node.js Chat Peer in TypeScript | ✅ | ✅ | ✅ | ✅ | ✅ |
| [`go-peer`](./go-peer/) | Chat peer implemented in Go | ✅ | ❌ | ✅ | ✅ | ✅ |
| [`rust-peer`](./rust-peer/) | Chat peer implemented in Rust | ❌ | ❌ | ✅ | ✅ | |
| [`rust-peer`](./rust-peer/) | Chat peer implemented in Rust | ❌ | ❌ | ✅ | ✅ | |
✅ - Protocol supported
❌ - Protocol not supported
@@ -42,7 +42,6 @@ There are two ways to connect to a peer:
Load the UI, and enter the multiaddr into the UI. Ensure that it includes the peerID, e.g.`/ip4/192.168.178.21/udp/61838/quic-v1/webtransport/certhash/uEiCQCALYac4V3LJ2ourLdauXOswIXpIuJ_JNT-8Wavmxyw/certhash/uEiCdYghq5FlXGkVONQXT07CteA16BDyMPI23-0GjA9Ej_w/p2p/12D3KooWF7ovRNBKPxERf6GtUbFdiqJsQviKUb7Z8a2Uuuo6MrDX`
## Getting started: Browser JS
### 1. Install dependencies
@@ -84,7 +83,7 @@ cd rust-peer
cargo run
```
This will automatically connect you to the bootstrap node running on [fly.io](https://fly.io).
This will automatically connect you to the bootstrap nodes running on bootstrap.libp2p.io.
To explore more advanced configurations if you e.g. want to set up our own network, try:

View File

@@ -1,3 +1,4 @@
target/
*.pem
local_key
local.key
local.peerid

862
rust-peer/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -6,16 +6,22 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0"
clap = { version = "4.1.11", features = ["derive", "env"] }
env_logger = "0.11.4"
futures = "0.3.27"
futures-timer = "3.0.2"
libp2p = { version = "0.55", features = ["identify", "ping", "tokio", "gossipsub", "macros", "relay", "kad", "rsa", "ed25519", "quic", "request-response", "dns", "memory-connection-limits"] }
anyhow = "1.0.97"
async-trait = "0.1.88"
clap = { version = "4.5.32", features = ["derive", "env"] }
crossterm = "0.28.1"
futures = "0.3.31"
futures-timer = "3.0.3"
hex = "0.4.3"
libp2p = { version = "0.55", features = ["identify", "ping", "tokio", "gossipsub", "macros", "relay", "kad", "rsa", "ed25519", "quic", "request-response", "dns", "memory-connection-limits", "tcp", "noise", "yamux", "autonat", "tls", "dcutr"] }
libp2p-webrtc = { version = "0.9.0-alpha", features = ["tokio", "pem"] }
log = "0.4.17"
quick-protobuf = "0.8.1"
rand = "0.8.5"
tokio = { version = "1.27.0", features = ["full"] }
tokio-util = { version = "0.7", features = ["full"] }
async-trait = "0.1.68"
ratatui = "0.29.0"
serde_json = "1.0.140"
signal-hook = "0.3.17"
tokio = { version = "1.44.1", features = ["full"] }
tokio-util = { version = "0.7.14", features = ["full"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
unsigned-varint = "0.8.0"

View File

@@ -1,5 +1,5 @@
# syntax=docker/dockerfile:1.5-labs
FROM rust:1.72.0 as builder
FROM rust:1.85.1 as builder
RUN rustup target add x86_64-unknown-linux-musl
RUN --mount=type=cache,target=/var/cache/apt apt-get update && apt-get install -y musl-dev musl-tools
@@ -11,14 +11,23 @@ RUN --mount=type=cache,target=./target \
--mount=type=cache,target=/usr/local/cargo/registry \
cargo build --release --target x86_64-unknown-linux-musl
#RUN --mount=type=cache,target=./target \
# mv ./target/x86_64-unknown-linux-musl/release/main /usr/local/bin/rust-libp2p-webrtc-peer
RUN --mount=type=cache,target=./target \
mv ./target/x86_64-unknown-linux-musl/release/rust-libp2p-webrtc-peer /usr/local/bin/rust-libp2p-webrtc-peer
mv ./target/x86_64-unknown-linux-musl/release/main /usr/local/bin/rust-libp2p-webrtc-peer && \
ls -lh /usr/local/bin/rust-libp2p-webrtc-peer
FROM alpine:3
WORKDIR /app
COPY --from=builder /usr/local/bin/rust-libp2p-webrtc-peer /usr/bin/rust-libp2p-webrtc-peer
COPY --from=builder /usr/local/bin/rust-libp2p-webrtc-peer /usr/local/bin/rust-libp2p-webrtc-peer
RUN --mount=type=cache,target=/var/cache/apk apk add bind-tools
RUN ulimit -n 65536
RUN mkdir /app/certificates
ENV RUST_BACKTRACE=1
EXPOSE 9090
EXPOSE 9091
EXPOSE 9092
CMD ["rust-libp2p-webrtc-peer"]
CMD ["rust-libp2p-webrtc-peer", "--headless"]

110
rust-peer/src/bin/main.rs Normal file
View File

@@ -0,0 +1,110 @@
use rust_libp2p_webrtc_peer::prelude::*;
use anyhow::Result;
use clap::Parser;
use libp2p::{identity, PeerId};
use libp2p_webrtc::tokio::Certificate;
use std::path::{Path, PathBuf};
use tokio::{fs, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::info;
#[tokio::main]
async fn main() -> Result<()> {
// parse the command line arguments
let opt = Options::parse();
// initialize the tracing logger and get the receiver for log messages
let from_log = Log::init();
// create a shutdown token
let shutdown = CancellationToken::new();
// load the identity and certificate
let local_key = read_or_create_identity(&opt.local_key_path).await?;
let webrtc_cert = read_or_create_certificate(&opt.local_cert_path).await?;
// create the ui and the channels to communicate with it
let (mut ui, to_ui, from_ui) = if opt.headless {
Headless::build(local_key.public().into(), from_log, shutdown.clone())
} else {
Tui::build(local_key.public().into(), from_log, shutdown.clone())
};
// create the peer, connecting it to the ui
let mut peer = Peer::new(local_key, webrtc_cert, to_ui, from_ui, shutdown.clone()).await?;
// spawn tasks for both the swarm and the ui
let peer_task: JoinHandle<Result<()>> = tokio::spawn(async move { peer.run().await });
let ui_task: JoinHandle<Result<()>> = tokio::spawn(async move { ui.run().await });
// wait for the tasks to finish
let (ui_result, peer_result) = tokio::try_join!(peer_task, ui_task)?;
// check the inner results
ui_result?;
peer_result?;
Ok(())
}
async fn read_or_create_certificate(path: &Path) -> Result<Certificate> {
if path.exists() {
let pem = fs::read_to_string(&path).await?;
info!("Using existing certificate from {}", path.display());
return Ok(Certificate::from_pem(&pem)?);
}
let cert = Certificate::generate(&mut rand::thread_rng())?;
fs::write(&path, &cert.serialize_pem().as_bytes()).await?;
info!(
"Generated new certificate and wrote it to {}",
path.display()
);
Ok(cert)
}
async fn read_or_create_identity(path: &Path) -> Result<identity::Keypair> {
let mut key_path = PathBuf::from(path);
let is_key = key_path
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext == "key")
.unwrap_or(false);
if !is_key {
key_path.set_extension("key");
}
let mut peer_id_path = PathBuf::from(path);
let is_peer_id = peer_id_path
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext == "peerid")
.unwrap_or(false);
if !is_peer_id {
peer_id_path.set_extension("peerid");
}
if key_path.exists() {
let bytes = fs::read(&key_path).await?;
info!("Using existing identity from {}", key_path.display());
// This only works for ed25519 but that is what we are using
return Ok(identity::Keypair::from_protobuf_encoding(&bytes)?);
}
let identity = identity::Keypair::generate_ed25519();
fs::write(&key_path, &identity.to_protobuf_encoding()?).await?;
let peer_id: PeerId = identity.public().into();
fs::write(&peer_id_path, peer_id.to_string()).await?;
info!(
"Generated new identity and wrote it to {}",
key_path.display()
);
Ok(identity)
}

57
rust-peer/src/chatpeer.rs Normal file
View File

@@ -0,0 +1,57 @@
use libp2p::PeerId;
use std::fmt;
/// A wrapper for PeerId for chat peers
/// TODO: expand this to include a user-set name, and possibly a user-set avatar
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct ChatPeer(PeerId);
impl ChatPeer {
/// Get the peer id
pub fn id(&self) -> PeerId {
self.0
}
/// Get the peer name
pub fn name(&self) -> String {
short_id(&self.0)
}
}
impl From<ChatPeer> for PeerId {
fn from(peer: ChatPeer) -> PeerId {
peer.0
}
}
impl From<&PeerId> for ChatPeer {
fn from(peer: &PeerId) -> Self {
ChatPeer(peer.to_owned())
}
}
impl From<PeerId> for ChatPeer {
fn from(peer: PeerId) -> Self {
ChatPeer(peer)
}
}
impl fmt::Debug for ChatPeer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} ({})", &self.0, short_id(&self.0))
}
}
impl fmt::Display for ChatPeer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", short_id(&self.0))
}
}
// Get the last 8 characters of a PeerId
fn short_id(peer: &PeerId) -> String {
let s = peer.to_string();
s.chars()
.skip(s.chars().count().saturating_sub(7))
.collect()
}

View File

@@ -8,34 +8,40 @@ use libp2p::{request_response, StreamProtocol};
// To request a file a peer sends the varuint encoded length of the file id string followed by the
// file id string itself.
//
// FileRequest:
// Request:
// varuint - file id length
// bytes - file id
//
// The file response message consists of a varuint length followed by the contents of the file.
//
// FileResponse:
// Response:
// varuint - file contents length
// bytes - file contents
//
/// The codec for the file exchange protocol.
#[derive(Default, Clone)]
pub struct FileExchangeCodec;
pub struct Codec;
/// The request message for the file exchange protocol.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileRequest {
pub struct Request {
/// The identifier of the file that is being requested.
pub file_id: String,
}
/// The response message for the file exchange protocol.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileResponse {
pub struct Response {
/// The contents of the file that is being sent.
pub file_body: Vec<u8>,
}
#[async_trait]
impl request_response::Codec for FileExchangeCodec {
impl request_response::Codec for Codec {
type Protocol = StreamProtocol;
type Request = FileRequest;
type Response = FileResponse;
type Request = Request;
type Response = Response;
async fn read_request<T>(&mut self, _: &StreamProtocol, io: &mut T) -> io::Result<Self::Request>
where
@@ -47,7 +53,7 @@ impl request_response::Codec for FileExchangeCodec {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok(FileRequest {
Ok(Request {
file_id: String::from_utf8(vec).unwrap(),
})
}
@@ -66,14 +72,14 @@ impl request_response::Codec for FileExchangeCodec {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok(FileResponse { file_body: vec })
Ok(Response { file_body: vec })
}
async fn write_request<T>(
&mut self,
_: &StreamProtocol,
io: &mut T,
FileRequest { file_id }: FileRequest,
Request { file_id }: Request,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
@@ -87,7 +93,7 @@ impl request_response::Codec for FileExchangeCodec {
&mut self,
_: &StreamProtocol,
io: &mut T,
FileResponse { file_body }: FileResponse,
Response { file_body }: Response,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,

View File

@@ -0,0 +1,2 @@
// Automatically generated mod.rs
pub mod peer;

View File

@@ -0,0 +1,10 @@
syntax = "proto3";
package peer;
message Peer {
// public key of the peer
bytes publicKey = 1;
// array of multiaddrs for the peer
repeated bytes multiAddrs = 2;
}

View File

@@ -0,0 +1,52 @@
// Automatically generated rust module for 'peer.proto' file
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(unused_imports)]
#![allow(unknown_lints)]
#![allow(clippy::all)]
#![cfg_attr(rustfmt, rustfmt_skip)]
use std::borrow::Cow;
use quick_protobuf::{MessageInfo, MessageRead, MessageWrite, BytesReader, Writer, WriterBackend, Result};
use quick_protobuf::sizeofs::*;
use super::*;
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct Peer<'a> {
pub publicKey: Cow<'a, [u8]>,
pub multiAddrs: Vec<Cow<'a, [u8]>>,
}
impl<'a> MessageRead<'a> for Peer<'a> {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(10) => msg.publicKey = r.read_bytes(bytes).map(Cow::Borrowed)?,
Ok(18) => msg.multiAddrs.push(r.read_bytes(bytes).map(Cow::Borrowed)?),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl<'a> MessageWrite for Peer<'a> {
fn get_size(&self) -> usize {
0
+ if self.publicKey == Cow::Borrowed(b"") { 0 } else { 1 + sizeof_len((&self.publicKey).len()) }
+ self.multiAddrs.iter().map(|s| 1 + sizeof_len((s).len())).sum::<usize>()
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
if self.publicKey != Cow::Borrowed(b"") { w.write_with_tag(10, |w| w.write_bytes(&**&self.publicKey))?; }
for s in &self.multiAddrs { w.write_with_tag(18, |w| w.write_bytes(&**s))?; }
Ok(())
}
}

55
rust-peer/src/lib.rs Normal file
View File

@@ -0,0 +1,55 @@
//! rust-libp2p-webrtc-peer crate
#![warn(missing_docs)]
#![deny(
trivial_casts,
trivial_numeric_casts,
unused_import_braces,
unused_qualifications
)]
/// The chat peer module
pub mod chatpeer;
pub use chatpeer::ChatPeer;
/// The peer file transfer protocol
pub mod file_exchange;
pub use file_exchange::{Codec, Request, Response};
/// The peer logging module
pub mod log;
pub use log::Log;
/// The peer message module
pub mod message;
pub use message::Message;
/// The command line options module
pub mod options;
pub use options::Options;
/// The peer module
pub mod peer;
pub use peer::Peer;
/// The protobuf generated module
mod proto {
#![allow(unreachable_pub)]
include!("generated/mod.rs");
pub(crate) use self::peer::Peer;
}
/// The peer ui module
pub mod ui;
pub use ui::{Headless, Tui, Ui};
/// The misc util module
pub mod util;
pub use util::{
decode_unknown_protobuf, extract_ip_multiaddr, ipaddr_to_multiaddr, is_private_ip,
pretty_print_fields, split_peer_id, WireType,
};
/// Prelude module
pub mod prelude {
pub use super::*;
}

71
rust-peer/src/log.rs Normal file
View File

@@ -0,0 +1,71 @@
use std::fmt;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::{
field::{Field, Visit},
Event, Level, Subscriber,
};
use tracing_subscriber::{
filter::EnvFilter, layer::Context, prelude::*, registry::LookupSpan, Layer,
};
// Custom tracing layer to send log events over mpsc
struct MpscLayer {
sender: Sender<Message>,
}
/// Custom tracing event that is send and sync
#[derive(Clone, Debug)]
pub struct Message {
/// The log level of the event
pub level: Level,
/// The log message of the event
pub message: String,
}
// Implement a visitor to extract fields from the event
struct FieldVisitor {
message: Option<String>,
}
impl Visit for FieldVisitor {
fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
if field.name() == "message" {
self.message = Some(format!("{:?}", value));
}
}
}
impl<S> Layer<S> for MpscLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
let mut visitor = FieldVisitor { message: None };
event.record(&mut visitor);
let event_data = Message {
level: *event.metadata().level(),
message: visitor.message.unwrap_or_default(),
};
let _ = self.sender.try_send(event_data);
}
}
/// Async tracing logger wrapper that filters and feeds log messages over an mpsc channel for
/// integration into the TUI gui.
pub struct Log;
impl Log {
/// Starts the logger and returns the task handle and receiver for the log messages.
pub fn init() -> Receiver<Message> {
let (sender, receiver) = mpsc::channel(16);
let filter = EnvFilter::from_default_env();
let layer = MpscLayer { sender }.with_filter(filter);
tracing_subscriber::registry().with(layer).init();
receiver
}
}

View File

@@ -1,401 +0,0 @@
mod protocol;
use anyhow::{Context, Result};
use clap::Parser;
use futures::future::{select, Either};
use futures::StreamExt;
use libp2p::{
core::muxing::StreamMuxerBox,
gossipsub, identify, identity,
kad::store::MemoryStore,
kad::{Behaviour as Kademlia, Config as KademliaConfig},
memory_connection_limits,
multiaddr::{Multiaddr, Protocol},
relay,
request_response::{self, ProtocolSupport},
swarm::{NetworkBehaviour, Swarm, SwarmEvent},
PeerId, StreamProtocol, SwarmBuilder, Transport,
};
use libp2p_webrtc as webrtc;
use libp2p_webrtc::tokio::Certificate;
use log::{debug, error, info, warn};
use protocol::FileExchangeCodec;
use std::net::IpAddr;
use std::path::Path;
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
time::Duration,
};
use tokio::fs;
use crate::protocol::FileRequest;
const TICK_INTERVAL: Duration = Duration::from_secs(15);
const KADEMLIA_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/kad/1.0.0");
const FILE_EXCHANGE_PROTOCOL: StreamProtocol =
StreamProtocol::new("/universal-connectivity-file/1");
const PORT_WEBRTC: u16 = 9090;
const PORT_QUIC: u16 = 9091;
const LOCAL_KEY_PATH: &str = "./local_key";
const LOCAL_CERT_PATH: &str = "./cert.pem";
const GOSSIPSUB_CHAT_TOPIC: &str = "universal-connectivity";
const GOSSIPSUB_CHAT_FILE_TOPIC: &str = "universal-connectivity-file";
const GOSSIPSUB_PEER_DISCOVERY: &str = "universal-connectivity-browser-peer-discovery";
const BOOTSTRAP_NODES: [&str; 4] = [
"/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
];
#[derive(Debug, Parser)]
#[clap(name = "universal connectivity rust peer")]
struct Opt {
/// Address to listen on.
#[clap(long, default_value = "0.0.0.0")]
listen_address: IpAddr,
/// If known, the external address of this node. Will be used to correctly advertise our external address across all transports.
#[clap(long, env)]
external_address: Option<IpAddr>,
/// Nodes to connect to on startup. Can be specified several times.
#[clap(
long,
default_value = "/dns/universal-connectivity-rust-peer.fly.dev/udp/9091/quic-v1"
)]
connect: Vec<Multiaddr>,
}
/// An example WebRTC peer that will accept connections
#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let opt = Opt::parse();
let local_key = read_or_create_identity(Path::new(LOCAL_KEY_PATH))
.await
.context("Failed to read identity")?;
let webrtc_cert = read_or_create_certificate(Path::new(LOCAL_CERT_PATH))
.await
.context("Failed to read certificate")?;
let mut swarm = create_swarm(local_key, webrtc_cert)?;
let address_webrtc = Multiaddr::from(opt.listen_address)
.with(Protocol::Udp(PORT_WEBRTC))
.with(Protocol::WebRTCDirect);
let address_quic = Multiaddr::from(opt.listen_address)
.with(Protocol::Udp(PORT_QUIC))
.with(Protocol::QuicV1);
swarm
.listen_on(address_webrtc.clone())
.expect("listen on webrtc");
swarm
.listen_on(address_quic.clone())
.expect("listen on quic");
for addr in opt.connect {
if let Err(e) = swarm.dial(addr.clone()) {
debug!("Failed to dial {addr}: {e}");
}
}
for peer in &BOOTSTRAP_NODES {
let multiaddr: Multiaddr = peer.parse().expect("Failed to parse Multiaddr");
if let Err(e) = swarm.dial(multiaddr) {
debug!("Failed to dial {peer}: {e}");
}
}
let chat_topic_hash = gossipsub::IdentTopic::new(GOSSIPSUB_CHAT_TOPIC).hash();
let file_topic_hash = gossipsub::IdentTopic::new(GOSSIPSUB_CHAT_FILE_TOPIC).hash();
let peer_discovery_hash = gossipsub::IdentTopic::new(GOSSIPSUB_PEER_DISCOVERY).hash();
let mut tick = futures_timer::Delay::new(TICK_INTERVAL);
loop {
match select(swarm.next(), &mut tick).await {
Either::Left((event, _)) => match event.unwrap() {
SwarmEvent::NewListenAddr { address, .. } => {
if let Some(external_ip) = opt.external_address {
let external_address = address
.replace(0, |_| Some(external_ip.into()))
.expect("address.len > 1 and we always return `Some`");
swarm.add_external_address(external_address);
}
let p2p_address = address.with(Protocol::P2p(*swarm.local_peer_id()));
info!("Listening on {p2p_address}");
}
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
info!("Connected to {peer_id}");
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
warn!("Failed to dial {peer_id:?}: {error}");
}
SwarmEvent::IncomingConnectionError { error, .. } => {
warn!("{:#}", anyhow::Error::from(error))
}
SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
warn!("Connection to {peer_id} closed: {cause:?}");
swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
info!("Removed {peer_id} from the routing table (if it was in there).");
}
SwarmEvent::Behaviour(BehaviourEvent::Relay(e)) => {
debug!("{:?}", e);
}
SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(
libp2p::gossipsub::Event::Message {
message_id: _,
propagation_source: _,
message,
},
)) => {
if message.topic == chat_topic_hash {
info!(
"Received message from {:?}: {}",
message.source,
String::from_utf8(message.data).unwrap()
);
continue;
}
if message.topic == file_topic_hash {
let file_id = String::from_utf8(message.data).unwrap();
info!("Received file {} from {:?}", file_id, message.source);
let request_id = swarm.behaviour_mut().request_response.send_request(
&message.source.unwrap(),
FileRequest {
file_id: file_id.clone(),
},
);
info!(
"Requested file {} to {:?}: req_id:{:?}",
file_id, message.source, request_id
);
continue;
}
if message.topic == peer_discovery_hash {
info!("Received peer discovery from {:?}", message.source);
continue;
}
error!("Unexpected gossipsub topic hash: {:?}", message.topic);
}
SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(
libp2p::gossipsub::Event::Subscribed { peer_id, topic },
)) => {
debug!("{peer_id} subscribed to {topic}");
}
SwarmEvent::Behaviour(BehaviourEvent::Identify(e)) => {
info!("BehaviourEvent::Identify {:?}", e);
if let identify::Event::Error { peer_id, error, .. } = e {
match error {
libp2p::swarm::StreamUpgradeError::Timeout => {
// When a browser tab closes, we don't get a swarm event
// maybe there's a way to get this with TransportEvent
// but for now remove the peer from routing table if there's an Identify timeout
swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
info!("Removed {peer_id} from the routing table (if it was in there).");
}
_ => {
debug!("{error}");
}
}
} else if let identify::Event::Received {
info: identify::Info { observed_addr, .. },
..
} = e
{
debug!("identify::Event::Received observed_addr: {}", observed_addr);
// this should switch us from client to server mode in kademlia
swarm.add_external_address(observed_addr);
}
}
SwarmEvent::Behaviour(BehaviourEvent::Kademlia(e)) => {
debug!("Kademlia event: {:?}", e);
}
SwarmEvent::Behaviour(BehaviourEvent::RequestResponse(
request_response::Event::Message { message, .. },
)) => match message {
request_response::Message::Request { request, .. } => {
//TODO: support ProtocolSupport::Full
debug!(
"umimplemented: request_response::Message::Request: {:?}",
request
);
}
request_response::Message::Response { response, .. } => {
info!(
"request_response::Message::Response: size:{}",
response.file_body.len()
);
// TODO: store this file (in memory or disk) and provider it via Kademlia
}
},
SwarmEvent::Behaviour(BehaviourEvent::RequestResponse(
request_response::Event::OutboundFailure {
request_id, error, ..
},
)) => {
error!(
"request_response::Event::OutboundFailure for request {:?}: {:?}",
request_id, error
);
}
event => {
debug!("Other type of event: {:?}", event);
}
},
Either::Right(_) => {
tick = futures_timer::Delay::new(TICK_INTERVAL);
debug!(
"external addrs: {:?}",
swarm.external_addresses().collect::<Vec<&Multiaddr>>()
);
if let Err(e) = swarm.behaviour_mut().kademlia.bootstrap() {
debug!("Failed to run Kademlia bootstrap: {e:?}");
}
}
}
}
}
#[derive(NetworkBehaviour)]
struct Behaviour {
gossipsub: gossipsub::Behaviour,
identify: identify::Behaviour,
kademlia: Kademlia<MemoryStore>,
relay: relay::Behaviour,
request_response: request_response::Behaviour<FileExchangeCodec>,
connection_limits: memory_connection_limits::Behaviour,
}
fn create_swarm(
local_key: identity::Keypair,
certificate: Certificate,
) -> Result<Swarm<Behaviour>> {
let local_peer_id = PeerId::from(local_key.public());
debug!("Local peer id: {local_peer_id}");
// To content-address message, we can take the hash of message and use it as an ID.
let message_id_fn = |message: &gossipsub::Message| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
gossipsub::MessageId::from(s.finish().to_string())
};
// Set a custom gossipsub configuration
let gossipsub_config = gossipsub::ConfigBuilder::default()
.validation_mode(gossipsub::ValidationMode::Permissive) // This sets the kind of message validation. The default is Strict (enforce message signing)
.message_id_fn(message_id_fn) // content-address messages. No two messages of the same content will be propagated.
.mesh_outbound_min(1)
.mesh_n_low(1)
.flood_publish(true)
.build()
.expect("Valid config");
// build a gossipsub network behaviour
let mut gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(local_key.clone()),
gossipsub_config,
)
.expect("Correct configuration");
// Create/subscribe Gossipsub topics
gossipsub.subscribe(&gossipsub::IdentTopic::new(GOSSIPSUB_CHAT_TOPIC))?;
gossipsub.subscribe(&gossipsub::IdentTopic::new(GOSSIPSUB_CHAT_FILE_TOPIC))?;
gossipsub.subscribe(&gossipsub::IdentTopic::new(GOSSIPSUB_PEER_DISCOVERY))?;
let identify_config = identify::Behaviour::new(
identify::Config::new("/ipfs/0.1.0".into(), local_key.public())
.with_interval(Duration::from_secs(60)), // do this so we can get timeouts for dropped WebRTC connections
);
// Create a Kademlia behaviour.
let cfg = KademliaConfig::new(KADEMLIA_PROTOCOL_NAME);
let store = MemoryStore::new(local_peer_id);
let kad_behaviour = Kademlia::with_config(local_peer_id, store, cfg);
let behaviour = Behaviour {
gossipsub,
identify: identify_config,
kademlia: kad_behaviour,
relay: relay::Behaviour::new(
local_peer_id,
relay::Config {
max_reservations: usize::MAX,
max_reservations_per_peer: 100,
reservation_rate_limiters: Vec::default(),
circuit_src_rate_limiters: Vec::default(),
max_circuits: usize::MAX,
max_circuits_per_peer: 100,
..Default::default()
},
),
request_response: request_response::Behaviour::new(
[(FILE_EXCHANGE_PROTOCOL, ProtocolSupport::Full)],
request_response::Config::default(),
),
connection_limits: memory_connection_limits::Behaviour::with_max_percentage(0.9),
};
Ok(SwarmBuilder::with_existing_identity(local_key.clone())
.with_tokio()
.with_quic()
.with_other_transport(|id_keys| {
Ok(webrtc::tokio::Transport::new(id_keys.clone(), certificate)
.map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))))
})?
.with_dns()?
.with_behaviour(|_key| behaviour)?
.build())
}
async fn read_or_create_certificate(path: &Path) -> Result<Certificate> {
if path.exists() {
let pem = fs::read_to_string(&path).await?;
info!("Using existing certificate from {}", path.display());
return Ok(Certificate::from_pem(&pem)?);
}
let cert = Certificate::generate(&mut rand::thread_rng())?;
fs::write(&path, &cert.serialize_pem().as_bytes()).await?;
info!(
"Generated new certificate and wrote it to {}",
path.display()
);
Ok(cert)
}
async fn read_or_create_identity(path: &Path) -> Result<identity::Keypair> {
if path.exists() {
let bytes = fs::read(&path).await?;
info!("Using existing identity from {}", path.display());
return Ok(identity::Keypair::from_protobuf_encoding(&bytes)?); // This only works for ed25519 but that is what we are using.
}
let identity = identity::Keypair::generate_ed25519();
fs::write(&path, &identity.to_protobuf_encoding()?).await?;
info!("Generated new identity and wrote it to {}", path.display());
Ok(identity)
}

25
rust-peer/src/message.rs Normal file
View File

@@ -0,0 +1,25 @@
use crate::ChatPeer;
use libp2p::core::PeerId;
/// The different types of messages sent between the UI and the Peer
#[derive(Debug)]
pub enum Message {
/// Send chat message
Chat {
/// The peer sending the message
from: Option<ChatPeer>,
/// The message sent
data: Vec<u8>,
},
/// All gossipsub peers and their topics
AllPeers {
/// The peers and their topics
peers: Vec<(PeerId, Vec<String>)>,
},
/// Add a peer
AddPeer(ChatPeer),
/// Remove a peer
RemovePeer(ChatPeer),
/// Add an event message
Event(String),
}

59
rust-peer/src/options.rs Normal file
View File

@@ -0,0 +1,59 @@
use clap::Parser;
use std::{net::IpAddr, path::PathBuf};
const LISTEN_ADDR: [&str; 1] = ["0.0.0.0"];
const LOCAL_KEY_PATH: &str = "./local";
const LOCAL_CERT_PATH: &str = "./cert.pem";
/// The rust peer command line options
#[derive(Debug, Parser)]
#[clap(name = "universal connectivity rust peer")]
pub struct Options {
/// Address to listen on.
#[clap(long, env, action = clap::ArgAction::Append, value_delimiter = ',', default_values = LISTEN_ADDR)]
pub listen_addresses: Vec<IpAddr>,
/// If known, the external address of this node. Will be used to correctly advertise our external address across all transports.
#[clap(long, env, action = clap::ArgAction::Append, value_delimiter = ',')]
pub external_addresses: Vec<IpAddr>,
/// Nodes to connect to on startup. Can be specified several times.
#[clap(long, env, action = clap::ArgAction::Append, value_delimiter = ',')]
pub connect: Vec<String>,
/// If set, the path to the local certificate file.
#[clap(long, env, default_value = LOCAL_CERT_PATH)]
pub local_cert_path: PathBuf,
/// If set, the path to the local key file.
#[clap(long, env, default_value = LOCAL_KEY_PATH)]
pub local_key_path: PathBuf,
/// If set, the peer will make autonat client requests (default: true)
#[clap(long, env, default_value = "true")]
pub autonat_client: bool,
/// If set, the peer will act as an autonat server
#[clap(long, env)]
pub autonat_server: bool,
/// If set, the peer will try to upgrade connections using DCUtR (default: true)
#[clap(long, env, default_value = "true")]
pub dcutr: bool,
/// If set, the peer will not initialize the TUI and will run headless.
#[clap(long, env)]
pub headless: bool,
/// If set, the peer will use kademlia (default: true)
#[clap(long, env, default_value = "true")]
pub kademlia: bool,
/// If set, the peer will support relay client connections (default: true)
#[clap(long, env, default_value = "true")]
pub relay_client: bool,
/// If set the peer will act as a relay server
#[clap(long, env)]
pub relay_server: bool,
}

1134
rust-peer/src/peer.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,114 @@
#![allow(dead_code)]
use crate::{log::Message as LogMessage, ChatPeer, Message, Ui};
use async_trait::async_trait;
use libp2p::core::PeerId;
use signal_hook::{consts::SIGTERM, iterator::Signals};
use std::{collections::HashSet, time::Duration};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_util::sync::CancellationToken;
/// A headless UI for the peer
pub struct Headless {
// my peer id
me: ChatPeer,
// we receive log messages from the log thread
from_log: Receiver<LogMessage>,
// we send UI messages to the peer thread
to_peer: Sender<Message>,
// we receive UI messages from the peer thread
from_peer: Receiver<Message>,
// the shutdown token
shutdown: CancellationToken,
// the list of peers
peers: HashSet<ChatPeer>,
}
impl Headless {
/// Create a new UI instance
pub fn build(
me: PeerId,
from_log: Receiver<LogMessage>,
shutdown: CancellationToken,
) -> (Box<dyn Ui + Send>, Sender<Message>, Receiver<Message>) {
// create a new channels for sending/receiving messages
let (to_peer, from_ui) = mpsc::channel::<Message>(64);
let (to_ui, from_peer) = mpsc::channel::<Message>(64);
// create a new TUI instance
let ui: Box<dyn Ui> = Box::new(Self {
me: me.into(),
from_log,
to_peer,
from_peer,
shutdown,
peers: HashSet::new(),
});
(ui, to_ui, from_ui)
}
}
#[async_trait]
impl Ui for Headless {
/// Run the UI
async fn run(&mut self) -> anyhow::Result<()> {
// Register the SIGTERM signal
let mut signals = Signals::new([SIGTERM])?;
println!("Headless UI started");
println!("Press Ctrl+C to exit");
println!("My peer id: {} ({})", self.me.id(), self.me);
// Main loop
'main: loop {
// Process log messages
if let Ok(log) = self.from_log.try_recv() {
//TODO: remove this after [PR 5966](https://github.com/libp2p/rust-libp2p/pull/5966)
if !log.message.starts_with("Can't send data channel") {
println!("{}", log.message);
}
}
// Process peer messages
if let Ok(ui_message) = self.from_peer.try_recv() {
match ui_message {
Message::Chat { from, data } => {
let from = from.map_or("Unknown".to_string(), |peer| peer.to_string());
let message =
String::from_utf8(data).unwrap_or("Invalid UTF-8".to_string());
println!("{}: {}", from, message);
}
Message::AddPeer(peer) => {
if self.peers.insert(peer) {
println!(
"Adding peer:\n\tpeer id: {}\n\tname: {}",
peer.id(),
peer.name()
);
}
}
Message::RemovePeer(peer) => {
if self.peers.remove(&peer) {
println!("Removing peer: {peer:?}");
}
}
Message::Event(event) => {
println!("{}", event);
}
_ => {}
}
}
// check if we have received the shutdown signal from the OS
if signals.pending().next() == Some(SIGTERM) {
println!("Received SIGTERM, shutting down");
self.shutdown.cancel();
break 'main;
}
tokio::time::sleep(Duration::from_millis(18)).await;
}
Ok(())
}
}

15
rust-peer/src/ui/mod.rs Normal file
View File

@@ -0,0 +1,15 @@
/// the async UI trait
/// the async UI trait
#[async_trait::async_trait]
pub trait Ui: Send {
/// Run the UI
async fn run(&mut self) -> anyhow::Result<()>;
}
/// the TUI implementation
pub mod tui;
pub use tui::Tui;
/// the headless implementation
pub mod headless;
pub use headless::Headless;

477
rust-peer/src/ui/tui.rs Normal file
View File

@@ -0,0 +1,477 @@
use crate::{log::Message as LogMessage, ChatPeer, Message, Ui};
use async_trait::async_trait;
use crossterm::{
event::{
self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode, KeyEvent, KeyModifiers,
MouseEvent, MouseEventKind,
},
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use libp2p::core::PeerId;
use ratatui::{
backend::CrosstermBackend,
layout::{Constraint, Direction, Layout},
prelude::{Buffer, Rect, Widget},
style::{Modifier, Style},
text::{Line, Span},
widgets::{Block, Borders, List, ListItem, Paragraph},
Terminal,
};
use std::{
collections::{HashSet, VecDeque},
io,
option::Option,
time::Duration,
};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
/// A simple UI for the peer
pub struct Tui {
// my peer id
me: ChatPeer,
// we receive log messages from the log thread
from_log: Receiver<LogMessage>,
// we send UI messages to the peer thread
to_peer: Sender<Message>,
// we receive UI messages from the peer thread
from_peer: Receiver<Message>,
// the shutdown token
shutdown: CancellationToken,
}
impl Tui {
/// Create a new UI instance
pub fn build(
me: PeerId,
from_log: Receiver<LogMessage>,
shutdown: CancellationToken,
) -> (Box<dyn Ui + Send>, Sender<Message>, Receiver<Message>) {
// create a new channels for sending/receiving messages
let (to_peer, from_ui) = mpsc::channel::<Message>(64);
let (to_ui, from_peer) = mpsc::channel::<Message>(64);
// create a new TUI instance
let ui: Box<dyn Ui> = Box::new(Self {
me: me.into(),
from_log,
to_peer,
from_peer,
shutdown,
});
(ui, to_ui, from_ui)
}
}
#[async_trait]
impl Ui for Tui {
/// Run the UI
async fn run(&mut self) -> anyhow::Result<()> {
// the currently selected tab
let mut selected_tab = 0;
// TUI setup
enable_raw_mode()?;
let mut stdout = io::stdout();
execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
// Log Widget
let mut log_widget = LinesWidget::new("Log", 200);
// Chat Widget
let mut chat_widget = ChatWidget::new(&self.me);
// Main loop
loop {
// Process log messages
if let Ok(log) = self.from_log.try_recv() {
//TODO: remove this after [PR 5966](https://github.com/libp2p/rust-libp2p/pull/5966)
if !log.message.starts_with("Can't send data channel") {
log_widget.add_line(log.message);
}
}
// Process peer messages
if let Ok(ui_message) = self.from_peer.try_recv() {
match ui_message {
Message::Chat { from, data } => {
let message =
String::from_utf8(data).unwrap_or("Invalid UTF-8".to_string());
chat_widget.add_chat(from, message);
}
Message::AllPeers { peers } => {
for (peer, topics) in peers {
let mut peer_str = format!("{peer}: ");
for topic in topics {
peer_str.push_str(&format!("\n\t{}, ", topic));
}
info!("{peer_str}");
}
}
Message::AddPeer(peer) => {
if chat_widget.peers.insert(peer) {
chat_widget.add_event(format!(
"Adding peer:\n\tpeer id: {}\n\tname: {}",
peer.id(),
peer.name()
));
}
}
Message::RemovePeer(peer) => {
if chat_widget.peers.remove(&peer) {
chat_widget.add_event(format!("Removing peer: {peer:?}"));
}
}
Message::Event(event) => {
chat_widget.add_event(event);
}
}
}
// Draw the UI
terminal.draw(|f| match selected_tab {
0 => f.render_widget(&mut chat_widget, f.area()),
1 => f.render_widget(&mut log_widget, f.area()),
_ => {}
})?;
// Handle input events
if event::poll(Duration::from_millis(18))? {
match event::read()? {
Event::Key(key) => match key {
// Handle ctrl+c
KeyEvent {
code: KeyCode::Char('c'),
modifiers: KeyModifiers::CONTROL,
..
} => {
info!("Received Ctrl+C, shutting down...");
self.shutdown.cancel();
break;
}
// Handle ctrl+shift+p
KeyEvent {
code: KeyCode::Char('p'),
modifiers: KeyModifiers::CONTROL | KeyModifiers::SHIFT,
..
} => {
error!("all peers sent");
self.to_peer
.send(Message::AllPeers { peers: vec![] })
.await?;
}
// Handle all other key events
_ => match key.code {
KeyCode::Tab => {
selected_tab = (selected_tab + 1) % 2;
}
KeyCode::Char(c) if selected_tab == 0 => {
chat_widget.input.push(c);
}
KeyCode::Backspace if selected_tab == 0 => {
chat_widget.input.pop();
}
KeyCode::Enter if selected_tab == 0 => {
error!("chat sent");
// send the chat message to the swarm to be gossiped
self.to_peer
.send(Message::Chat {
from: Some(self.me),
data: chat_widget.input.clone().into_bytes(),
})
.await?;
// add our chat to the local chat widget
chat_widget.add_chat(Some(self.me), chat_widget.input.clone());
// clear the input
chat_widget.input.clear();
}
_ => {}
},
},
Event::Mouse(event) => match selected_tab {
0 => {
let _ = chat_widget.mouse_event(event);
}
1 => {
let _ = log_widget.mouse_event(event);
}
_ => {}
},
_ => {}
}
}
}
// Cleanup
disable_raw_mode()?;
execute!(io::stdout(), LeaveAlternateScreen, DisableMouseCapture)?;
Ok(())
}
}
// Function to wrap text into multiple lines based on a max width
fn wrap_text(text: &str, max_width: usize) -> Vec<Line> {
let mut lines = Vec::new();
// split the message into lines to preserve any newlines in the message
for line in text.lines() {
// Convert tabs to 2 spaces
let processed_line = line.replace('\t', " ");
// find any leading whitespace
let leading_whitespace = processed_line
.chars()
.take_while(|c| c.is_whitespace())
.collect::<String>();
// split into words for wrapping
let words = processed_line.split_whitespace().collect::<Vec<&str>>();
let mut current_line = String::new();
for word in words {
// Check if adding the word to the current line will exceed the max width
if current_line.len() + word.len() + (if current_line.is_empty() { 0 } else { 1 })
> max_width
{
if !current_line.is_empty() {
// add the current line to the lines
lines.push(Line::from(Span::raw(current_line)));
current_line = String::new();
}
// handle words that are longer than the max width
if word.len() > max_width {
let mut remaining = word;
while !remaining.is_empty() {
let split_point = if remaining.len() > max_width {
max_width
} else {
remaining.len()
};
let (chunk, rest) = remaining.split_at(split_point);
let l = format!("{}{}", leading_whitespace, chunk);
lines.push(Line::from(Span::raw(l)));
remaining = rest;
}
} else {
current_line = format!("{}{}", leading_whitespace, word);
}
} else {
// add the word to the current line
if current_line.is_empty() {
current_line.push_str(&leading_whitespace);
} else {
current_line.push(' ');
}
current_line.push_str(word);
}
}
if !current_line.is_empty() {
lines.push(Line::from(Span::raw(current_line)));
}
}
lines
}
// Lines Widget
struct LinesWidget {
title: String,
max: usize,
lines: VecDeque<String>,
scroll: usize,
area: Rect,
}
impl LinesWidget {
// Create a new LogWidget instance
fn new(title: impl Into<String>, max: usize) -> Self {
Self {
title: title.into(),
max,
lines: VecDeque::new(),
scroll: 0,
area: Rect::default(),
}
}
// Handle a mouse event
fn mouse_event(&mut self, event: MouseEvent) -> bool {
// check if the event happened in our area
let x = event.column;
let y = event.row;
if x >= self.area.x
&& x < self.area.x + self.area.width
&& y >= self.area.y
&& y < self.area.y + self.area.height
{
match event.kind {
MouseEventKind::ScrollUp => {
self.scroll += 1;
}
MouseEventKind::ScrollDown => {
if self.scroll > 0 {
self.scroll -= 1;
}
}
_ => {}
}
true
} else {
false
}
}
// Add a line to the widget
fn add_line(&mut self, line: impl Into<String>) {
self.lines.push_back(line.into());
if self.lines.len() > self.max {
self.lines.drain(0..(self.lines.len() - self.max));
}
}
}
impl Widget for &mut LinesWidget {
fn render(self, area: Rect, buf: &mut Buffer) {
let block = Block::default()
.title(self.title.as_str())
.borders(Borders::ALL)
.style(Style::default());
self.area = block.inner(area);
let inner_area = self.area;
let max_lines = inner_area.height as usize;
let mut logs: Vec<ListItem> = self
.lines
.iter()
.flat_map(|l| {
let wrapped_lines = wrap_text(l, inner_area.width as usize - 2);
wrapped_lines
.into_iter()
.map(ListItem::new)
.collect::<Vec<_>>()
})
.collect();
if logs.len() > max_lines {
if logs.len() > (max_lines + self.scroll) {
logs.drain(0..(logs.len() - max_lines - self.scroll));
} else {
self.scroll = max_lines;
}
}
List::new(logs).block(block).render(area, buf);
}
}
// Chat Widget
struct ChatWidget<'a> {
me: &'a ChatPeer,
peers: HashSet<ChatPeer>,
chat: LinesWidget,
events: LinesWidget,
input: String,
}
impl<'a> ChatWidget<'a> {
// Create a new ChatWidget instance
fn new(me: &'a ChatPeer) -> Self {
let mut peers = HashSet::new();
peers.insert(*me);
ChatWidget {
me,
peers,
chat: LinesWidget::new("Chat", 100),
events: LinesWidget::new("System", 100),
input: String::new(),
}
}
// Handle a mouse event
fn mouse_event(&mut self, event: MouseEvent) -> bool {
self.chat.mouse_event(event) || self.events.mouse_event(event)
}
// Add a chat message to the widget
fn add_chat(&mut self, peer: Option<ChatPeer>, message: impl Into<String>) {
let peer = peer.map_or("Unknown".to_string(), |p| p.to_string());
self.chat.add_line(format!("{}: {}", peer, message.into()));
}
// Add an event message to the widget
fn add_event(&mut self, event: impl Into<String>) {
self.events.add_line(event);
}
}
impl Widget for &mut ChatWidget<'_> {
fn render(self, area: Rect, buf: &mut Buffer) {
// Renders a layout with three rows, the top row is 50% of the height, the middle row is
// 50% of the height and the bottom row is 1 line hight. The top row contains two columns,
// the second column is 18 characters wide and the first column fills the remaining space.
// The second row contains the LogWidget showing event messages. The bottom row is a chat
// input line that starts with "> ".
let layout = Layout::default()
.direction(Direction::Vertical)
.constraints(
[
Constraint::Percentage(50),
Constraint::Percentage(50),
Constraint::Length(1),
]
.as_ref(),
)
.split(area);
// calculate the layout for the top row
let top_layout = Layout::default()
.direction(Direction::Horizontal)
.constraints([Constraint::Percentage(100), Constraint::Length(24)].as_ref())
.split(layout[0]);
// render the chat messages
self.chat.render(top_layout[0], buf);
// render the peers list
let peers_block = Block::default()
.title("Peers")
.borders(Borders::ALL)
.style(Style::default());
let peers: Vec<ListItem> = self
.peers
.iter()
.map(|p| {
if p == self.me {
ListItem::new(Span::styled(
format!("{} (You)", p),
Style::default().add_modifier(Modifier::ITALIC),
))
} else {
ListItem::new(Span::raw(p.to_string()))
}
})
.collect();
List::new(peers)
.block(peers_block)
.render(top_layout[1], buf);
// render the events messages
self.events.render(layout[1], buf);
// render the chat input
Paragraph::new(format!("{} > {}", self.me, self.input.clone())).render(layout[2], buf);
}
}

189
rust-peer/src/util.rs Normal file
View File

@@ -0,0 +1,189 @@
use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
use quick_protobuf::reader::BytesReader;
use std::{convert::TryFrom, fmt, net::IpAddr};
/// Define protobuf wire types since they are no longer in quick-protobuf
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum WireType {
/// Varint wire type
Varint = 0,
/// Fixed64 wire type
Fixed64 = 1,
/// Length-delimited wire type
LengthDelimited = 2,
/// Start group wire type
StartGroup = 3,
/// End group wire type
EndGroup = 4,
/// Fixed32 wire type
Fixed32 = 5,
}
/// Error type for TryFrom conversion
#[derive(Debug)]
pub struct InvalidWireType(u32);
impl fmt::Display for InvalidWireType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Invalid wire type value: {}", self.0)
}
}
impl std::error::Error for InvalidWireType {}
impl TryFrom<u32> for WireType {
type Error = InvalidWireType;
fn try_from(tag: u32) -> Result<Self, Self::Error> {
// Extract wire type from the lower 3 bits
let wire_type_value = tag & 0x07;
match wire_type_value {
0 => Ok(WireType::Varint),
1 => Ok(WireType::Fixed64),
2 => Ok(WireType::LengthDelimited),
3 => Ok(WireType::StartGroup),
4 => Ok(WireType::EndGroup),
5 => Ok(WireType::Fixed32),
invalid => Err(InvalidWireType(invalid)),
}
}
}
/// Decode an unknown protobuf message into a list of fields
pub fn decode_unknown_protobuf(bytes: &[u8]) -> anyhow::Result<Vec<String>> {
let mut reader = BytesReader::from_bytes(bytes);
let mut fields = Vec::new();
// Read the next tag
while let Ok(tag) = reader.next_tag(bytes) {
// Extract field number and wire type
let field_number = tag >> 3;
let wire_type = WireType::try_from(tag).map_err(|e| {
quick_protobuf::Error::Message(format!("Invalid wire type value: {}", e.0))
})?;
// Decode the value based on wire type
let value = match wire_type {
WireType::Varint => {
let varint = reader.read_varint64(bytes)?;
format!("int64: {}", varint) // Could also be int32, uint32, etc.
}
WireType::Fixed64 => {
let fixed64 = reader.read_fixed64(bytes)?;
format!("fixed64: {}", fixed64) // Could also be double
}
WireType::LengthDelimited => {
let len = reader.read_varint32(bytes)? as usize;
let data = reader.read_bytes(bytes)?;
// Try to interpret as string; if it fails, treat as raw bytes
match std::str::from_utf8(data) {
Ok(s) => format!("string: \"{}\"", s),
Err(_) => format!("bytes({}): {}", len, hex::encode(data)),
}
}
WireType::Fixed32 => {
let fixed32 = reader.read_fixed32(bytes)?;
format!("fixed32: {}", fixed32) // Could also be float
}
WireType::StartGroup | WireType::EndGroup => {
// Groups are deprecated and rare; skip for simplicity
return Err(
quick_protobuf::Error::Message("Groups not supported".to_string()).into(),
);
}
};
fields.push(format!(
"Field {} ({:?}): {}",
field_number, wire_type, value
));
}
Ok(fields)
}
/// Pretty print a list of fields
pub fn pretty_print_fields(fields: &[String]) -> String {
let mut output = String::new();
output.push_str("Decoded Protobuf Message {\n");
for field in fields {
output.push_str(" ");
output.push_str(field);
output.push('\n');
}
output.push('}');
output
}
/// Split the PeerId from a Multiaddr
pub fn split_peer_id(multiaddr: Multiaddr) -> Option<(Multiaddr, PeerId)> {
let mut base_addr = Multiaddr::empty();
let mut peer_id = None;
// Iterate over the protocols in the Multiaddr
for protocol in multiaddr.into_iter() {
if let Protocol::P2p(id) = protocol {
peer_id = Some(id);
break; // Stop once we find the P2p component
} else {
base_addr.push(protocol); // Add non-P2p components to the base address
}
}
peer_id.map(|id| (base_addr, id))
}
/// Extract the IP address from a Multiaddr
pub fn extract_ip_multiaddr(multiaddr: &Multiaddr) -> Option<Multiaddr> {
let mut result = Multiaddr::empty();
for component in multiaddr.into_iter() {
match component {
Protocol::Ip4(addr) => {
result.push(Protocol::Ip4(addr));
return Some(result);
}
Protocol::Ip6(addr) => {
result.push(Protocol::Ip6(addr));
return Some(result);
}
_ => continue,
}
}
None
}
/// Check if a Multiaddr contains a private IP address
pub fn is_private_ip(multiaddr: &Multiaddr) -> bool {
for component in multiaddr.into_iter() {
match component {
Protocol::Ip4(addr) => {
return addr.is_private() || // 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16
addr.is_loopback() || // 127.0.0.0/8
addr.is_link_local() || // 169.254.0.0/16
addr.is_unspecified(); // 0.0.0.0
}
Protocol::Ip6(addr) => {
return addr.is_loopback() || // ::1
addr.is_unspecified() || // ::
// Unique Local Address (fc00::/7 where 8th bit is 1)
(addr.segments()[0] & 0xfe00 == 0xfc00) ||
// Link-Local unicast (fe80::/10)
(addr.segments()[0] & 0xffc0 == 0xfe80);
}
_ => continue,
}
}
false
}
/// Convert an IP address to a Multiaddr
pub fn ipaddr_to_multiaddr(ip: &IpAddr) -> Multiaddr {
let multiaddr = match ip {
IpAddr::V4(ipv4) => Multiaddr::empty().with(Protocol::Ip4(*ipv4)),
IpAddr::V6(ipv6) => Multiaddr::empty().with(Protocol::Ip6(*ipv6)),
};
multiaddr
}