[go-peer]: roomName handling + used as service name for mDNS discovery (#322)

* feat: handle room flag

* feat: roomName acts like a service name for mDNS discovery

---------

Co-authored-by: dozyio <github@dozy.io>
This commit is contained in:
Alex Mylonas
2025-11-09 17:23:18 +02:00
committed by GitHub
parent 6b6b8c41a9
commit 0671a706d3
2 changed files with 33 additions and 24 deletions

View File

@@ -18,11 +18,6 @@ const ChatRoomBufSize = 128
// Topic used to broadcast browser WebRTC addresses // Topic used to broadcast browser WebRTC addresses
const PubSubDiscoveryTopic string = "universal-connectivity-browser-peer-discovery" const PubSubDiscoveryTopic string = "universal-connectivity-browser-peer-discovery"
const (
ChatTopic string = "universal-connectivity"
ChatFileTopic string = "universal-connectivity-file"
)
// ChatRoom represents a subscription to a single PubSub topic. Messages // ChatRoom represents a subscription to a single PubSub topic. Messages
// can be published to the topic with ChatRoom.Publish, and received // can be published to the topic with ChatRoom.Publish, and received
// messages are pushed to the Messages channel. // messages are pushed to the Messages channel.
@@ -54,9 +49,9 @@ type ChatMessage struct {
// JoinChatRoom tries to subscribe to the PubSub topic for the room name, returning // JoinChatRoom tries to subscribe to the PubSub topic for the room name, returning
// a ChatRoom on success. // a ChatRoom on success.
func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname string) (*ChatRoom, error) { func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname string, roomName string) (*ChatRoom, error) {
// join the pubsub chatTopic // join the pubsub chatTopic
chatTopic, err := ps.Join(ChatTopic) chatTopic, err := ps.Join(roomName)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -68,7 +63,8 @@ func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname
} }
// join the pubsub fileTopic // join the pubsub fileTopic
fileTopic, err := ps.Join(ChatFileTopic) fileTopicName := roomName + "-file"
fileTopic, err := ps.Join(fileTopicName)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -102,6 +98,7 @@ func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname
peerDiscoveryTopic: peerDiscoveryTopic, peerDiscoveryTopic: peerDiscoveryTopic,
peerDiscoverySub: peerDiscoverySub, peerDiscoverySub: peerDiscoverySub,
nick: nickname, nick: nickname,
roomName: roomName,
Messages: make(chan *ChatMessage, ChatRoomBufSize), Messages: make(chan *ChatMessage, ChatRoomBufSize),
SysMessages: make(chan *ChatMessage, ChatRoomBufSize), SysMessages: make(chan *ChatMessage, ChatRoomBufSize),
} }
@@ -117,7 +114,7 @@ func (cr *ChatRoom) Publish(message string) error {
} }
func (cr *ChatRoom) ListPeers() []peer.ID { func (cr *ChatRoom) ListPeers() []peer.ID {
return cr.ps.ListPeers(ChatTopic) return cr.ps.ListPeers(cr.roomName)
} }
// readLoop pulls messages from the pubsub chat/file topic and handles them. // readLoop pulls messages from the pubsub chat/file topic and handles them.

View File

@@ -5,6 +5,7 @@ import (
"flag" "flag"
"fmt" "fmt"
"math" "math"
"math/rand"
"os" "os"
"time" "time"
@@ -34,8 +35,9 @@ import (
// DiscoveryInterval is how often we re-publish our mDNS records. // DiscoveryInterval is how often we re-publish our mDNS records.
const DiscoveryInterval = time.Hour const DiscoveryInterval = time.Hour
// DiscoveryServiceTag is used in our mDNS advertisements to discover other chat peers. // DefaultRoom is used as the gossipsub topic to join and the DiscoveryServiceTag in mDNS advertisements.
const DiscoveryServiceTag = "universal-connectivity" // It can be overridden by the -room flag. The concept of different rooms is only supported by mDNS in Go.
const DefaultRoom = "universal-connectivity"
var SysMsgChan chan *ChatMessage var SysMsgChan chan *ChatMessage
@@ -63,10 +65,10 @@ func NewDHT(ctx context.Context, host host.Host, bootstrapPeers []multiaddr.Mult
// Borrowed from https://medium.com/rahasak/libp2p-pubsub-peer-discovery-with-kademlia-dht-c8b131550ac7 // Borrowed from https://medium.com/rahasak/libp2p-pubsub-peer-discovery-with-kademlia-dht-c8b131550ac7
// Only used by Go peer to find each other. // Only used by Go peer to find each other.
// TODO: since this isn't implemented on the Rust or the JS side, can probably be removed // TODO: since this isn't implemented on the Rust or the JS side, can probably be removed
func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT) { func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, tag string) {
routingDiscovery := routing.NewRoutingDiscovery(dht) routingDiscovery := routing.NewRoutingDiscovery(dht)
discovery.Advertise(ctx, routingDiscovery, DiscoveryServiceTag) discovery.Advertise(ctx, routingDiscovery, tag)
ticker := time.NewTicker(time.Second * 10) ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop() defer ticker.Stop()
@@ -77,7 +79,7 @@ func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT) {
return return
case <-ticker.C: case <-ticker.C:
peers, err := discovery.FindPeers(ctx, routingDiscovery, DiscoveryServiceTag) peers, err := discovery.FindPeers(ctx, routingDiscovery, tag)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -107,6 +109,7 @@ func main() {
// parse some flags to set our nickname and the room to join // parse some flags to set our nickname and the room to join
nickFlag := flag.String("nick", "", "nickname to use in chat. will be generated if empty") nickFlag := flag.String("nick", "", "nickname to use in chat. will be generated if empty")
idPath := flag.String("identity", "identity.key", "path to the private key (PeerID) file") idPath := flag.String("identity", "identity.key", "path to the private key (PeerID) file")
roomFlag := flag.String("room", DefaultRoom, "the gossipsub topic / room to join (mDNS only)")
headless := flag.Bool("headless", false, "run without chat UI") headless := flag.Bool("headless", false, "run without chat UI")
port := flag.String("port", "9095", "port to listen on") port := flag.String("port", "9095", "port to listen on")
@@ -229,7 +232,7 @@ func main() {
} }
// join the chat room // join the chat room
cr, err := JoinChatRoom(ctx, h, ps, nick) cr, err := JoinChatRoom(ctx, h, ps, nick, *roomFlag)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -244,10 +247,10 @@ func main() {
} }
// setup peer discovery // setup peer discovery
go Discover(ctx, h, hDHT) go Discover(ctx, h, hDHT, *roomFlag)
// setup local mDNS discovery // setup local mDNS discovery
if err = setupDiscovery(h); err != nil { if err := setupDiscovery(h, *roomFlag); err != nil {
panic(err) panic(err)
} }
@@ -352,18 +355,27 @@ type discoveryNotifee struct {
// the PubSub system will automatically start interacting with them if they also // the PubSub system will automatically start interacting with them if they also
// support PubSub. // support PubSub.
func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) { func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) {
LogMsgf("discovered new peer %s", pi.ID.String()) LogMsgf("mDNS discovered new peer %s", pi.ID.String())
err := n.h.Connect(context.Background(), pi)
if err != nil { go func(pi peer.AddrInfo) {
LogMsgf("error connecting to peer %s: %s", pi.ID.String(), err) // add 1 second jitter to avoid all peers connecting at the same time
} time.Sleep(time.Duration(1+rand.Intn(999)) * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := n.h.Connect(ctx, pi)
if err != nil {
LogMsgf("error connecting to mDNS peer %s: %s", pi.ID.String(), err)
}
}(pi)
} }
// setupDiscovery creates an mDNS discovery service and attaches it to the libp2p Host. // setupDiscovery creates an mDNS discovery service and attaches it to the libp2p Host.
// This lets us automatically discover peers on the same LAN and connect to them. // This lets us automatically discover peers on the same LAN and connect to them.
func setupDiscovery(h host.Host) error { func setupDiscovery(h host.Host, serviceName string) error {
// setup mDNS discovery to find local peers // setup mDNS discovery to find local peers
s := mdns.NewMdnsService(h, DiscoveryServiceTag, &discoveryNotifee{h: h}) s := mdns.NewMdnsService(h, serviceName, &discoveryNotifee{h: h})
return s.Start() return s.Start()
} }