feat: add pubsub browser peer discovery (#133)

* rust-peer: do not add externally observed maddrs

Might fix #131

* feat: subscribe go peer to peer discovery topic

* feat: add pubsub peer discovery to js peer

* fix: build

* fix: use peer:discovery event to hook to peer discovery events

* fix: remove idb datastore

no real reason to use it for now

* feat: dial peers discovered through pubsub

* chore: simplify code and move out fn

* feat: autodial discovered with connection manager

* chore: remove connection manager logs

* refactor: simplify code further

---------

Co-authored-by: Daniel N <2color@users.noreply.github.com>
This commit is contained in:
Daniel Norman
2024-05-07 12:24:55 +02:00
committed by GitHub
parent 2246b251a1
commit bcce1fbb80
10 changed files with 145 additions and 113 deletions

View File

@@ -15,6 +15,12 @@ import (
// ChatRoomBufSize is the number of incoming messages to buffer for each topic.
const ChatRoomBufSize = 128
// Topic used to broadcast browser WebRTC addresses
const PubSubDiscoveryTopic string = "universal-connectivity-browser-peer-discovery"
const ChatTopic string = "universal-connectivity"
const ChatFileTopic string = "universal-connectivity-file"
// ChatRoom represents a subscription to a single PubSub topic. Messages
// can be published to the topic with ChatRoom.Publish, and received
// messages are pushed to the Messages channel.
@@ -23,13 +29,15 @@ type ChatRoom struct {
Messages chan *ChatMessage
SysMessages chan *ChatMessage
ctx context.Context
h host.Host
ps *pubsub.PubSub
chatTopic *pubsub.Topic
chatSub *pubsub.Subscription
fileTopic *pubsub.Topic
fileSub *pubsub.Subscription
ctx context.Context
h host.Host
ps *pubsub.PubSub
chatTopic *pubsub.Topic
chatSub *pubsub.Subscription
fileTopic *pubsub.Topic
fileSub *pubsub.Subscription
peerDiscoveryTopic *pubsub.Topic
peerDiscoverySub *pubsub.Subscription
roomName string
nick string
@@ -44,9 +52,9 @@ type ChatMessage struct {
// JoinChatRoom tries to subscribe to the PubSub topic for the room name, returning
// a ChatRoom on success.
func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname string, roomName string) (*ChatRoom, error) {
func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname string) (*ChatRoom, error) {
// join the pubsub chatTopic
chatTopic, err := ps.Join(chatTopicName(roomName))
chatTopic, err := ps.Join(ChatTopic)
if err != nil {
return nil, err
}
@@ -58,7 +66,7 @@ func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname
}
// join the pubsub fileTopic
fileTopic, err := ps.Join(fileTopicName(roomName))
fileTopic, err := ps.Join(ChatFileTopic)
if err != nil {
return nil, err
}
@@ -69,18 +77,31 @@ func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname
return nil, err
}
// join the pubsub peer disovery topic
peerDiscoveryTopic, err := ps.Join(PubSubDiscoveryTopic)
if err != nil {
return nil, err
}
// and subscribe to it
peerDiscoverySub, err := peerDiscoveryTopic.Subscribe()
if err != nil {
return nil, err
}
cr := &ChatRoom{
ctx: ctx,
h: h,
ps: ps,
chatTopic: chatTopic,
chatSub: chatSub,
fileTopic: fileTopic,
fileSub: fileSub,
nick: nickname,
roomName: roomName,
Messages: make(chan *ChatMessage, ChatRoomBufSize),
SysMessages: make(chan *ChatMessage, ChatRoomBufSize),
ctx: ctx,
h: h,
ps: ps,
chatTopic: chatTopic,
chatSub: chatSub,
fileTopic: fileTopic,
fileSub: fileSub,
peerDiscoveryTopic: peerDiscoveryTopic,
peerDiscoverySub: peerDiscoverySub,
nick: nickname,
Messages: make(chan *ChatMessage, ChatRoomBufSize),
SysMessages: make(chan *ChatMessage, ChatRoomBufSize),
}
// start reading messages from the subscription in a loop
@@ -94,7 +115,7 @@ func (cr *ChatRoom) Publish(message string) error {
}
func (cr *ChatRoom) ListPeers() []peer.ID {
return cr.ps.ListPeers(chatTopicName(cr.roomName))
return cr.ps.ListPeers(ChatTopic)
}
// readLoop pulls messages from the pubsub chat/file topic and handles them.
@@ -187,13 +208,3 @@ func (cr *ChatRoom) requestFile(toPeer peer.ID, fileID []byte) ([]byte, error) {
return fileBody, nil
}
// chatTopicName returns the name of the pubsub topic for the chat room.
func chatTopicName(roomName string) string {
return roomName
}
// fileTopicName returns the name of the pubsub topic used for sending/recieving files in the chat room.
func fileTopicName(roomName string) string {
return fmt.Sprintf("%s-file", roomName)
}

View File

@@ -72,10 +72,12 @@ func NewDHT(ctx context.Context, host host.Host, bootstrapPeers []multiaddr.Mult
}
// Borrowed from https://medium.com/rahasak/libp2p-pubsub-peer-discovery-with-kademlia-dht-c8b131550ac7
func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, rendezvous string) {
// Only used by Go peer to find each other.
// TODO: since this isn't implemented on the Rust or the JS side, can probably be removed
func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT) {
routingDiscovery := routing.NewRoutingDiscovery(dht)
discovery.Advertise(ctx, routingDiscovery, rendezvous)
discovery.Advertise(ctx, routingDiscovery, DiscoveryServiceTag)
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
@@ -86,7 +88,7 @@ func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, rendezvous str
return
case <-ticker.C:
peers, err := discovery.FindPeers(ctx, routingDiscovery, rendezvous)
peers, err := discovery.FindPeers(ctx, routingDiscovery, DiscoveryServiceTag)
if err != nil {
panic(err)
}
@@ -115,7 +117,6 @@ func LogMsgf(f string, msg ...any) {
func main() {
// parse some flags to set our nickname and the room to join
nickFlag := flag.String("nick", "", "nickname to use in chat. will be generated if empty")
roomFlag := flag.String("room", "universal-connectivity", "name of chat room to join")
idPath := flag.String("identity", "identity.key", "path to the private key (PeerID) file")
certPath := flag.String("tls-cert-path", "", "path to the tls cert file (for websockets)")
keyPath := flag.String("tls-key-path", "", "path to the tls key file (for websockets")
@@ -194,11 +195,8 @@ func main() {
nick = defaultNick(h.ID())
}
// join the room from the cli flag, or the flag default
room := *roomFlag
// join the chat room
cr, err := JoinChatRoom(ctx, h, ps, nick, room)
cr, err := JoinChatRoom(ctx, h, ps, nick)
if err != nil {
panic(err)
}
@@ -213,7 +211,7 @@ func main() {
}
// setup peer discovery
go Discover(ctx, h, dht, "universal-connectivity")
go Discover(ctx, h, dht)
// setup local mDNS discovery
if err := setupDiscovery(h); err != nil {

View File

@@ -18,11 +18,11 @@
"@libp2p/identify": "^1.0.19",
"@libp2p/interface-pubsub": "^4.0.1",
"@libp2p/kad-dht": "^12.0.13",
"@libp2p/pubsub-peer-discovery": "^10.0.2",
"@libp2p/webrtc": "^4.0.28",
"@libp2p/websockets": "^8.0.20",
"@libp2p/webtransport": "^4.0.28",
"@multiformats/multiaddr": "^12.2.1",
"datastore-idb": "^2.1.9",
"debug": "^4.3.4",
"it-length-prefixed": "^9.0.4",
"it-map": "^3.1.0",
@@ -3115,6 +3115,37 @@
"uint8arrays": "^5.0.2"
}
},
"node_modules/@libp2p/pubsub-peer-discovery": {
"version": "10.0.2",
"resolved": "https://registry.npmjs.org/@libp2p/pubsub-peer-discovery/-/pubsub-peer-discovery-10.0.2.tgz",
"integrity": "sha512-7DLasMSo443nxPJ+X95tXazXgO96K2/TafoexDxi4QVWIKgkmK+HyoFRcmwog2pjhA1/KQUsPu8S8wH6Ns9Oow==",
"dependencies": {
"@libp2p/interface": "^1.0.1",
"@libp2p/interface-internal": "^1.0.1",
"@libp2p/peer-id": "^4.0.1",
"@multiformats/multiaddr": "^12.0.0",
"protons-runtime": "^5.0.0",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.9"
}
},
"node_modules/@libp2p/pubsub-peer-discovery/node_modules/multiformats": {
"version": "12.1.3",
"resolved": "https://registry.npmjs.org/multiformats/-/multiformats-12.1.3.tgz",
"integrity": "sha512-eajQ/ZH7qXZQR2AgtfpmSMizQzmyYVmCql7pdhldPuYQi4atACekbJaQplk6dWyIi10jCaFnd6pqvcEFXjbaJw==",
"engines": {
"node": ">=16.0.0",
"npm": ">=7.0.0"
}
},
"node_modules/@libp2p/pubsub-peer-discovery/node_modules/uint8arrays": {
"version": "4.0.10",
"resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-4.0.10.tgz",
"integrity": "sha512-AnJNUGGDJAgFw/eWu/Xb9zrVKEGlwJJCaeInlf3BkecE/zcTobk5YXYIPNQJO1q5Hh1QZrQQHf0JvcHqz2hqoA==",
"dependencies": {
"multiformats": "^12.0.1"
}
},
"node_modules/@libp2p/pubsub/node_modules/multiformats": {
"version": "13.1.0",
"resolved": "https://registry.npmjs.org/multiformats/-/multiformats-13.1.0.tgz",
@@ -5901,18 +5932,6 @@
"it-take": "^3.0.4"
}
},
"node_modules/datastore-idb": {
"version": "2.1.9",
"resolved": "https://registry.npmjs.org/datastore-idb/-/datastore-idb-2.1.9.tgz",
"integrity": "sha512-o1LAE2VgVMeEWOP/zHYHV6MetGzbN+C72nRRUFwPXk0xLopsPN9wlH73D3De3pyDZKoKp2875XDFpRkDJ8mGWA==",
"dependencies": {
"datastore-core": "^9.0.0",
"idb": "^8.0.0",
"interface-datastore": "^8.0.0",
"it-filter": "^3.0.4",
"it-sort": "^3.0.4"
}
},
"node_modules/dayjs": {
"version": "1.11.10",
"resolved": "https://registry.npmjs.org/dayjs/-/dayjs-1.11.10.tgz",
@@ -7660,11 +7679,6 @@
"node": ">=10.17.0"
}
},
"node_modules/idb": {
"version": "8.0.0",
"resolved": "https://registry.npmjs.org/idb/-/idb-8.0.0.tgz",
"integrity": "sha512-l//qvlAKGmQO31Qn7xdzagVPPaHTxXx199MhrAFuVBTPqydcPYBWjkrbv4Y0ktB+GmWOiwHl237UUOrLmQxLvw=="
},
"node_modules/ieee754": {
"version": "1.2.1",
"resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz",

View File

@@ -19,11 +19,11 @@
"@libp2p/identify": "^1.0.19",
"@libp2p/interface-pubsub": "^4.0.1",
"@libp2p/kad-dht": "^12.0.13",
"@libp2p/pubsub-peer-discovery": "^10.0.2",
"@libp2p/webrtc": "^4.0.28",
"@libp2p/websockets": "^8.0.20",
"@libp2p/webtransport": "^4.0.28",
"@multiformats/multiaddr": "^12.2.1",
"datastore-idb": "^2.1.9",
"debug": "^4.3.4",
"it-length-prefixed": "^9.0.4",
"it-map": "^3.1.0",

View File

@@ -1,7 +1,7 @@
import React, { createContext, useContext, useEffect, useState } from 'react';
import { useLibp2pContext } from './ctx';
import type { Message } from '@libp2p/interface'
import { CHAT_FILE_TOPIC, CHAT_TOPIC, FILE_EXCHANGE_PROTOCOL } from '@/lib/constants'
import { CHAT_FILE_TOPIC, CHAT_TOPIC, FILE_EXCHANGE_PROTOCOL, PUBSUB_PEER_DISCOVERY } from '@/lib/constants'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { pipe } from 'it-pipe'
@@ -58,8 +58,11 @@ export const ChatProvider = ({ children }: any) => {
chatFileMessageCB(evt, topic, data)
break
}
case PUBSUB_PEER_DISCOVERY: {
break
}
default: {
throw new Error(`Unexpected gossipsub topic: ${topic}`)
console.error(`Unexpected event %o on gossipsub topic: ${topic}`, evt)
}
}
}

View File

@@ -5,7 +5,6 @@ import { PeerId } from '@libp2p/interface'
export interface PeerStats {
peerIds: PeerId[]
connected: boolean
connections: Connection[]
latency: number
}
@@ -17,7 +16,6 @@ export interface PeerContextInterface {
export const peerContext = createContext<PeerContextInterface>({
peerStats: {
peerIds: [],
connected: true,
connections: [],
latency: 0
},
@@ -31,7 +29,6 @@ export const usePeerContext = () => {
export const PeerProvider = ({ children }: { children: ReactNode }) => {
const [peerStats, setPeerStats] = useState<PeerStats>({
peerIds: [],
connected: false,
connections: [],
latency: 0
});

View File

@@ -1,5 +1,6 @@
export const CHAT_TOPIC = "universal-connectivity"
export const CHAT_FILE_TOPIC = "universal-connectivity-file"
export const PUBSUB_PEER_DISCOVERY = "universal-connectivity-browser-peer-discovery"
export const FILE_EXCHANGE_PROTOCOL = "/universal-connectivity-file/1"
export const CIRCUIT_RELAY_CODE = 290

View File

@@ -1,4 +1,3 @@
import { IDBDatastore } from 'datastore-idb'
import {
createDelegatedRoutingV1HttpApiClient,
DelegatedRoutingV1HttpApiClient,
@@ -11,28 +10,24 @@ import { yamux } from '@chainsafe/libp2p-yamux'
import { bootstrap } from '@libp2p/bootstrap'
import { Multiaddr } from '@multiformats/multiaddr'
import { sha256 } from 'multiformats/hashes/sha2'
import type { Message, SignedMessage, PeerId } from '@libp2p/interface'
import type { Connection, Message, SignedMessage, PeerId } from '@libp2p/interface'
import { gossipsub } from '@chainsafe/libp2p-gossipsub'
import { webSockets } from '@libp2p/websockets'
import { webTransport } from '@libp2p/webtransport'
import { webRTC, webRTCDirect } from '@libp2p/webrtc'
import { BOOTSTRAP_PEER_IDS, CHAT_FILE_TOPIC, CHAT_TOPIC } from './constants'
import { circuitRelayTransport } from '@libp2p/circuit-relay-v2'
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'
import { BOOTSTRAP_PEER_IDS, CHAT_FILE_TOPIC, CHAT_TOPIC, PUBSUB_PEER_DISCOVERY } from './constants'
import first from 'it-first'
export async function startLibp2p() {
// enable verbose logging in browser console to view debug logs
localStorage.debug = 'libp2p*,-*:trace'
// application-specific data lives in the datastore
const datastore = new IDBDatastore('universal-connectivity')
await datastore.open()
localStorage.debug = 'libp2p*,-libp2p:connection-manager*,-*:trace'
const delegatedClient = createDelegatedRoutingV1HttpApiClient('https://delegated-ipfs.dev')
const { bootstrapAddrs, relayListenAddrs } = await getBootstrapMultiaddrs(delegatedClient)
const libp2p = await createLibp2p({
datastore,
addresses: {
listen: [
// 👇 Listen for webRTC connection
@@ -54,6 +49,7 @@ export async function startLibp2p() {
],
},
}),
// 👇 Required to estalbish connections with peers supporting WebRTC-direct, e.g. the Rust-peer
webRTCDirect(),
// 👇 Required to create circuit relay reservations in order to hole punch browser-to-browser WebRTC connections
circuitRelayTransport({
@@ -62,8 +58,8 @@ export async function startLibp2p() {
}),
],
connectionManager: {
maxConnections: 10,
minConnections: 0,
maxConnections: 30,
minConnections: 5,
},
connectionEncryption: [noise()],
streamMuxers: [yamux()],
@@ -71,6 +67,11 @@ export async function startLibp2p() {
denyDialMultiaddr: async () => false,
},
peerDiscovery: [
pubsubPeerDiscovery({
interval: 10_000,
topics: [PUBSUB_PEER_DISCOVERY],
listenOnly: false,
}),
bootstrap({
// The app-specific go and rust bootstrappers use WebTransport and WebRTC-direct which have ephemeral multiadrrs
// that are resolved above using the delegated routing API
@@ -93,15 +94,24 @@ export async function startLibp2p() {
libp2p.services.pubsub.subscribe(CHAT_TOPIC)
libp2p.services.pubsub.subscribe(CHAT_FILE_TOPIC)
// .catch((e) => {
// console.log('woot', e)
// })
libp2p.addEventListener('self:peer:update', ({ detail: { peer } }) => {
const multiaddrs = peer.addresses.map(({ multiaddr }) => multiaddr)
console.log(`changed multiaddrs: peer ${peer.id.toString()} multiaddrs: ${multiaddrs}`)
})
// 👇 explicitly dialling peers discovered via pubsub is only necessary
// when minConnections is set to 0 and the connection manager doesn't autodial
// libp2p.addEventListener('peer:discovery', (event) => {
// const { multiaddrs, id } = event.detail
// if (libp2p.getConnections(id)?.length > 0) {
// console.log(`Already connected to peer %s. Will not try dialling`, id)
// return
// }
// dialWebRTCMaddrs(libp2p, multiaddrs)
// })
return libp2p
}
@@ -116,6 +126,23 @@ export async function msgIdFnStrictNoSign(msg: Message): Promise<Uint8Array> {
return await sha256.encode(encodedSeqNum)
}
// Function which dials one maddr at a time to avoid establishing multiple connections to the same peer
async function dialWebRTCMaddrs(libp2p: Libp2p, multiaddrs: Multiaddr[]): Promise<void> {
// Filter webrtc (browser-to-browser) multiaddrs
const webRTCMadrs = multiaddrs.filter((maddr) => maddr.protoNames().includes('webrtc'))
console.log(`peer:discovery with maddrs: %o`, webRTCMadrs)
for (const addr of webRTCMadrs) {
try {
console.log(`woot attempting to dial webrtc multiaddr: %o`, addr)
await libp2p.dial(addr)
return // if we succeed dialing the peer, no need to try another address
} catch (error) {
console.error(`woot failed to dial webrtc multiaddr: %o`, addr)
}
}
}
export const connectToMultiaddr = (libp2p: Libp2p) => async (multiaddr: Multiaddr) => {
console.log(`dialling: ${multiaddr.toString()}`)
try {
@@ -133,7 +160,9 @@ export const connectToMultiaddr = (libp2p: Libp2p) => async (multiaddr: Multiadd
async function getBootstrapMultiaddrs(
client: DelegatedRoutingV1HttpApiClient,
): Promise<BootstrapsMultiaddrs> {
const peers = await Promise.all(BOOTSTRAP_PEER_IDS.map(peerId => first(client.getPeers(peerIdFromString(peerId)))))
const peers = await Promise.all(
BOOTSTRAP_PEER_IDS.map((peerId) => first(client.getPeers(peerIdFromString(peerId)))),
)
const bootstrapAddrs = []
const relayListenAddrs = []
@@ -166,3 +195,9 @@ interface BootstrapsMultiaddrs {
// Constructs a multiaddr string representing the circuit relay v2 listen address for a relayed connection to the given peer.
const getRelayListenAddr = (maddr: Multiaddr, peer: PeerId): string =>
`${maddr.toString()}/p2p/${peer.toString()}/p2p-circuit`
export const getFormattedConnections = (connections: Connection[]) =>
connections.map((conn) => ({
peerId: conn.remotePeer,
protocols: [...new Set(conn.remoteAddr.protoNames())],
}))

View File

@@ -8,7 +8,7 @@ import { usePeerContext } from '../context/peer-ctx'
import { useCallback, useEffect, useState } from 'react'
import Image from 'next/image'
import { multiaddr } from '@multiformats/multiaddr'
import { connectToMultiaddr } from '../lib/libp2p'
import { connectToMultiaddr, getFormattedConnections } from '../lib/libp2p'
import { useListenAddressesContext } from '../context/listen-addresses-ctx'
import Spinner from '@/components/spinner'
@@ -28,7 +28,6 @@ export default function Home() {
...peerStats,
peerIds: connections.map((conn) => conn.remotePeer),
connections: connections,
connected: connections.length > 0,
})
}, 1000)
@@ -42,7 +41,6 @@ export default function Home() {
const multiaddrs = libp2p.getMultiaddrs()
setListenAddresses({
...listenAddresses,
multiaddrs,
})
}, 1000)
@@ -52,32 +50,6 @@ export default function Home() {
}
}, [libp2p, listenAddresses, setListenAddresses])
type PeerProtoTuple = {
peerId: PeerId
protocols: string[]
}
const getFormattedConnections = (connections: Connection[]): PeerProtoTuple[] => {
const protoNames: Map<PeerId, string[]> = new Map()
connections.forEach((conn) => {
const exists = protoNames.get(conn.remotePeer)
const dedupedProtonames = [...new Set(conn.remoteAddr.protoNames())]
if (exists?.length) {
const namesToAdd = dedupedProtonames.filter((name) => !exists.includes(name))
// console.log('namesToAdd: ', namesToAdd)
protoNames.set(conn.remotePeer, [...exists, ...namesToAdd])
} else {
protoNames.set(conn.remotePeer, dedupedProtonames)
}
})
return [...protoNames.entries()].map(([peerId, protocols]) => ({
peerId,
protocols,
}))
}
const handleConnectToMultiaddr = useCallback(
async (e: React.MouseEvent<HTMLButtonElement>) => {
@@ -181,7 +153,7 @@ export default function Home() {
</div>
<div className="my-4 inline-flex items-center text-xl">
Connected:{' '}
{peerStats.connected ? (
{peerStats.connections.length > 0 ? (
<CheckCircleIcon className="inline w-6 h-6 text-green-500" />
) : (
<XCircleIcon className="w-6 h-6 text-red-500" />
@@ -198,7 +170,7 @@ export default function Home() {
<ul className="divide-y divide-gray-200">
{getFormattedConnections(peerStats.connections).map((pair) => (
<li
key={pair.peerId.toString()}
key={`${pair.peerId.toString()}-${pair.protocols.join('-')}`}
className="py-1 flex flex-wrap justify-between items-center break-all"
>
<span>{`${pair.peerId.toString()} (${pair.protocols.join(', ')})`}</span>

View File

@@ -217,7 +217,8 @@ async fn main() -> Result<()> {
{
debug!("identify::Event::Received observed_addr: {}", observed_addr);
swarm.add_external_address(observed_addr);
// Disable to see if it's the cause of the wrong multiaddrs getting announced
// swarm.add_external_address(observed_addr);
// TODO: The following should no longer be necessary after https://github.com/libp2p/rust-libp2p/pull/4371.
if protocols.iter().any(|p| p == &KADEMLIA_PROTOCOL_NAME) {