feat: add Node.js peer (#268)

Adds a basic Node.js peer with a terminal user interface.

Lifts some code from the existing js-peer. Doesn't support DMs or dialing peers directly (yet).

Fixes #214

---------

Co-authored-by: Daniel N <2color@users.noreply.github.com>
This commit is contained in:
Alex Potsides
2025-04-22 13:31:44 +01:00
committed by GitHub
parent 1f74148f60
commit 24e2960c39
24 changed files with 9225 additions and 6 deletions

View File

@@ -20,11 +20,12 @@ Some of the cool and cutting-edge [transport protocols](https://connectivity.lib
## Packages
| Package | Description | WebTransport | WebRTC | WebRTC-direct | QUIC | TCP |
| :-------------------------- | :------------------------------ | ------------ | ------ | ------------- | ---- | --- |
| [`js-peer`](./js-peer/) | Browser Chat Peer in TypeScript | ✅ | ✅ | ✅ | ❌ | ❌ |
| [`go-peer`](./go-peer/) | Chat peer implemented in Go | ✅ | | ✅ | ✅ | ✅ |
| [`rust-peer`](./rust-peer/) | Chat peer implemented in Rust | | ❌ | ✅ | ✅ | |
| Package | Description | WebTransport | WebRTC | WebRTC-direct | QUIC | TCP |
| :-------------------------------- | :------------------------------ | ------------ | ------ | ------------- | ---- | --- |
| [`js-peer`](./js-peer/) | Browser Chat Peer in TypeScript | ✅ | ✅ | ✅ | ❌ | ❌ |
| [`node-js-peer`](./node-js-peer/) | Node.js Chat Peer in TypeScript | ✅ | | ✅ | ✅ | ✅ |
| [`go-peer`](./go-peer/) | Chat peer implemented in Go | | ❌ | ✅ | ✅ | |
| [`rust-peer`](./rust-peer/) | Chat peer implemented in Rust | ❌ | ❌ | ✅ | ✅ | ❌ |
✅ - Protocol supported
❌ - Protocol not supported
@@ -42,7 +43,7 @@ There are two ways to connect to a peer:
Load the UI, and enter the multiaddr into the UI. Ensure that it includes the peerID, e.g.`/ip4/192.168.178.21/udp/61838/quic-v1/webtransport/certhash/uEiCQCALYac4V3LJ2ourLdauXOswIXpIuJ_JNT-8Wavmxyw/certhash/uEiCdYghq5FlXGkVONQXT07CteA16BDyMPI23-0GjA9Ej_w/p2p/12D3KooWF7ovRNBKPxERf6GtUbFdiqJsQviKUb7Z8a2Uuuo6MrDX`
## Getting started: JS
## Getting started: Browser JS
### 1. Install dependencies
@@ -61,6 +62,21 @@ Start the dev server:
npm run dev
```
## Getting started: Node.js
### 1. Install dependencies
```
cd node-js-peer
npm i
```
### 2. Start the app
```
npm start
```
## Getting started: Rust
```

1
node-js-peer/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
.dist

40
node-js-peer/README.md Normal file
View File

@@ -0,0 +1,40 @@
# Node.js peer
This is a JavaScript peer for the Universal Connectivity app implemented as a
command line app using a Terminal User Interface aimed at Node.js.
The TUI is implemented using [react-curse](https://www.npmjs.com/package/react-curse),
a JavaScript so should be familiar to anyone who has used [React](https://react.dev/) before.
## Getting Started
To start the app run:
```bash
npm start
# or
yarn start
# or
pnpm start
```
You should see a terminal user interface similar to this:
![Node.js peer terminal user interface](./assets/tui.png)
Use `CTRL-C` to exit the app.
## Hacking
You can start editing the app by modifying [./App.tsx](./App.tsx) and restarting the app.
The libp2p configuration can be found in [./lib/libp2p.ts](./lib/libp2p.ts).
## Learn More
To learn more about libp2p, take a look at the following resources:
- [js-libp2p on GitHub](https://github.com/libp2p/js-libp2p) - The js-libp2p repo
- [API docs](https://libp2p.github.io/js-libp2p/) - API documentation
- [Docs](https://github.com/libp2p/js-libp2p/tree/main/doc) - Longer form docs
- [Examples](https://github.com/libp2p/js-libp2p-examples) - How to do almost anything with your libp2p node

BIN
node-js-peer/assets/tui.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 65 KiB

7802
node-js-peer/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

41
node-js-peer/package.json Normal file
View File

@@ -0,0 +1,41 @@
{
"name": "universal-connectivity-node-js-peer",
"type": "module",
"scripts": {
"start": "npx esbuild src/App.tsx --outfile=.dist/index.js --bundle --platform=node --format=esm --external:'./node_modules/*' --sourcemap && node --enable-source-maps .dist",
"dist": "npx esbuild src/App.tsx --outfile=.dist/index.cjs --bundle --platform=node --define:'process.env.NODE_ENV=\"production\"' --minify --tree-shaking=true",
"generate": "protons protobuf/*.proto",
"format": "prettier --write src",
"lint": "prettier . --check"
},
"dependencies": {
"@chainsafe/libp2p-gossipsub": "^14.1.1",
"@chainsafe/libp2p-noise": "^16.1.0",
"@chainsafe/libp2p-quic": "^1.1.1",
"@chainsafe/libp2p-yamux": "^7.0.1",
"@helia/delegated-routing-v1-http-api-client": "^4.2.2",
"@libp2p/bootstrap": "^11.0.33",
"@libp2p/circuit-relay-v2": "^3.2.9",
"@libp2p/identify": "^3.0.28",
"@libp2p/interface": "^2.8.0",
"@libp2p/interface-internal": "^2.3.10",
"@libp2p/kad-dht": "^15.0.0",
"@libp2p/ping": "^2.0.28",
"@libp2p/pubsub-peer-discovery": "^11.0.1",
"@libp2p/tcp": "^10.1.9",
"@libp2p/webrtc": "^5.2.10",
"@libp2p/websockets": "^9.2.9",
"it-protobuf-stream": "^2.0.1",
"libp2p": "^2.8.3",
"multiformats": "^13.3.2",
"protons-runtime": "^5.5.0",
"react": "^18.3.1",
"react-curse": "^1.0.0",
"uint8arraylist": "^2.4.8"
},
"devDependencies": {
"@types/node": "^22.14.0",
"@types/react": "^18.0.27",
"protons": "^7.6.0"
}
}

55
node-js-peer/src/App.tsx Normal file
View File

@@ -0,0 +1,55 @@
import React from 'react'
import ReactCurse, { Banner, useSize } from 'react-curse'
import { AppWrapper } from './context/index.js'
import { PeerList } from './components/peer-list.js'
import { layout } from './lib/position.js'
import { SendMessage } from './components/send-message.js'
import Messages from './components/messages.js'
const App = () => {
const dims = useSize()
let title = 'Universal Connectivity Node.js Peer'
if (dims.width < 140) {
title = 'UC Node.js Peer'
}
if (dims.width < 50) {
dims.width = 50
}
if (dims.height < 30) {
dims.height = 30
}
return (
<>
<Banner>{title}</Banner>
<Messages
x={0}
y={layout.bannerHeight}
width={dims.width - layout.peerListWidth - layout.margin}
height={dims.height - layout.bannerHeight - layout.inputHeight - layout.margin - layout.margin}
/>
<PeerList
x={dims.width - layout.peerListWidth}
y={layout.bannerHeight}
width={layout.peerListWidth - layout.margin}
height={dims.height - layout.bannerHeight - layout.inputHeight - layout.margin - layout.margin}
/>
<SendMessage
x={0}
y={dims.height - layout.inputHeight - layout.margin}
width={dims.width - layout.margin}
height={layout.inputHeight}
/>
</>
)
}
ReactCurse.render((
<AppWrapper>
<App />
</AppWrapper>
))

View File

@@ -0,0 +1,18 @@
import React from 'react'
import { Text } from 'react-curse'
interface Props {
error?: string
}
export function Booting({ error }: Props) {
if (error) {
return (
<Text>Failed to start - {error}</Text>
)
}
return (
<Text>...libp2p is starting</Text>
)
}

View File

@@ -0,0 +1,31 @@
import React from 'react'
import { Text } from 'react-curse'
import { useLibp2pContext } from '../context/index.js'
import { ChatMessage } from '../context/chat.js'
import { useMarkAsRead } from '../hooks/mark-as-read.js'
import { peerIdFromString } from '@libp2p/peer-id'
import { peerColor } from '../lib/peer-color.js'
interface Props extends ChatMessage {
dm: boolean
children: any
}
export const Message = ({ msgId, msg, peerId, read, dm, receivedAt }: Props) => {
const { libp2p } = useLibp2pContext()
const p = peerIdFromString(peerId)
const isSelf = libp2p.peerId.equals(p)
const timestamp = new Date(receivedAt).toLocaleString()
useMarkAsRead(msgId, peerId, read, dm)
const color = isSelf ? undefined : `#${peerColor(p)}`
return (
<>
<Text color={color} block={true}>{timestamp} - {peerId.substring(peerId.length - 7)} - {msg}</Text>
</>
)
}

View File

@@ -0,0 +1,58 @@
import React, { useEffect, useState } from 'react'
import { Text, Frame, View } from 'react-curse'
import { useLibp2pContext } from '../context/index.js'
import { PUBLIC_CHAT_ROOM_ID } from '../constants.js'
import { ChatMessage, useChatContext } from '../context/chat.js'
import { Message } from './message.js'
import { Logger } from '@libp2p/interface'
import { layout, PositionProps } from '../lib/position.js'
import { shortPeerId } from '../lib/short-peer-id.js'
let log: Logger
export default function Messages(props: PositionProps) {
const { libp2p } = useLibp2pContext()
const { roomId, setRoomId } = useChatContext()
const { messageHistory, setMessageHistory, directMessages, setDirectMessages } = useChatContext()
const [ messages, setMessages ] = useState<ChatMessage[]>([])
log = log ?? libp2p.logger.forComponent('chat')
const handleBackToPublic = () => {
setRoomId(PUBLIC_CHAT_ROOM_ID)
setMessages(messageHistory)
}
useEffect(() => {
// assumes a chat room is a peerId thus a direct message
if (roomId === PUBLIC_CHAT_ROOM_ID) {
setMessages(messageHistory)
} else {
setMessages(directMessages[roomId] || [])
}
}, [roomId, directMessages, messageHistory])
const title = roomId === PUBLIC_CHAT_ROOM_ID ? `Public chat (${shortPeerId(libp2p.peerId)})` : `DM (${shortPeerId(libp2p.peerId)} x ${shortPeerId(roomId)})`
return (
<>
<Frame absolute={true} {...props}>
<View>{
messages.map(({ msgId, msg, peerId, read, receivedAt }) => (
<Message
key={msgId}
dm={roomId !== PUBLIC_CHAT_ROOM_ID}
msg={msg}
peerId={peerId}
read={read}
msgId={msgId}
receivedAt={receivedAt}
children={[]}
/>
))
}</View>
</Frame>
<Text absolute={true} x={props.x + layout.margin} y={props.y}>{title}</Text>
</>
)
}

View File

@@ -0,0 +1,42 @@
import { useLibp2pContext } from '../context/index.js'
import { CHAT_TOPIC } from '../constants.js'
import React, { useEffect, useState } from 'react'
import { Peer } from './peer.js'
import type { PeerId } from '@libp2p/interface'
import { Text, Frame, View } from 'react-curse'
import { PositionProps } from '../index.js'
export function PeerList(props: PositionProps) {
const { libp2p } = useLibp2pContext()
const [subscribers, setSubscribers] = useState<PeerId[]>([])
useEffect(() => {
const onSubscriptionChange = () => {
setSubscribers(libp2p.services.pubsub.getSubscribers(CHAT_TOPIC))
}
onSubscriptionChange()
libp2p.services.pubsub.addEventListener('subscription-change', onSubscriptionChange)
return () => {
libp2p.services.pubsub.removeEventListener('subscription-change', onSubscriptionChange)
}
}, [libp2p, setSubscribers])
return (
<>
<Frame absolute={true} {...props}>
<View>
{/* Have to specify empty children prop - https://github.com/infely/react-curse/pull/9 */}
<Peer key={libp2p?.peerId?.toString()} peer={libp2p?.peerId} self={true} children={[]} />
{subscribers.map((p) => (
<Peer key={p.toString()} peer={p} self={false} children={[]} />
))}
</View>
</Frame>
<Text absolute={true} x={props.x + 2} y={props.y}>
Topic Peers ({subscribers.length})
</Text>
</>
)
}

View File

@@ -0,0 +1,24 @@
import React from 'react'
import { PeerId } from '@libp2p/interface'
import { useChatContext } from '../context/chat.js'
import { Text } from 'react-curse'
import { shortPeerId } from '../lib/short-peer-id.js'
import { peerColor } from '../lib/peer-color.js'
export interface PeerProps {
peer: PeerId
self: boolean
children: any
}
export function Peer({ peer, self }: PeerProps) {
const { directMessages } = useChatContext()
const dmCount = directMessages[peer.toString()]?.length
const color = peerColor(peer)
return (
<Text color={`#${color}`} block={true}>
{shortPeerId(peer)} {self ? '(You)' : ''} {dmCount ? `(${dmCount})` : ''}
</Text>
)
}

View File

@@ -0,0 +1,86 @@
import React, { useCallback } from 'react'
import { Text, Frame, Input } from 'react-curse'
import { useLibp2pContext } from '../context/index.js'
import { CHAT_TOPIC, PUBLIC_CHAT_ROOM_ID } from '../constants.js'
import { layout, PositionProps } from '../lib/position.js'
import { ChatMessage, useChatContext } from '../context/chat.js'
import { peerIdFromString } from '@libp2p/peer-id'
import { Logger } from '@libp2p/interface'
let log: Logger
export function SendMessage(props: PositionProps) {
const { libp2p } = useLibp2pContext()
const { roomId, messageHistory, setMessageHistory, directMessages, setDirectMessages } = useChatContext()
log ??= libp2p.logger.forComponent('chat:send-message')
const onSubmit = (text) => {
if (roomId === PUBLIC_CHAT_ROOM_ID) {
sendPublicMessage(text)
} else {
sendDirectMessage(text)
}
}
// Send message to public chat over gossipsub
const sendPublicMessage = useCallback(async (input: string) => {
if (input === '') {
return
}
await libp2p.services.pubsub.publish(CHAT_TOPIC, new TextEncoder().encode(input))
const myPeerId = libp2p.peerId.toString()
setMessageHistory([
...messageHistory,
{
msgId: crypto.randomUUID(),
msg: input,
peerId: myPeerId,
read: true,
receivedAt: Date.now(),
},
])
}, [messageHistory, libp2p, setMessageHistory])
// Send direct message over custom protocol
const sendDirectMessage = useCallback(async (input: string) => {
try {
const res = await libp2p.services.directMessage.send(peerIdFromString(roomId), input)
if (!res) {
log('Failed to send message')
return
}
const myPeerId = libp2p.peerId.toString()
const newMessage: ChatMessage = {
msgId: crypto.randomUUID(),
msg: input,
peerId: myPeerId,
read: true,
receivedAt: Date.now(),
}
const updatedMessages = directMessages[roomId] ? [...directMessages[roomId], newMessage] : [newMessage]
setDirectMessages({
...directMessages,
[roomId]: updatedMessages,
})
} catch (e: any) {
log.error('error sending message - %e', e)
}
}, [libp2p, setDirectMessages, directMessages, roomId])
return (
<>
<Frame absolute={true} {...props}>
<Input onSubmit={onSubmit}></Input>
</Frame>
<Text absolute={true} x={props.x + layout.margin} y={props.y}>Send message</Text>
</>
)
}

View File

@@ -0,0 +1,13 @@
export const CHAT_TOPIC = 'universal-connectivity'
export const CHAT_FILE_TOPIC = 'universal-connectivity-file'
export const PUBSUB_PEER_DISCOVERY = 'universal-connectivity-browser-peer-discovery'
export const DIRECT_MESSAGE_PROTOCOL = '/universal-connectivity/dm/1.0.0'
export const CIRCUIT_RELAY_CODE = 290
export const MIME_TEXT_PLAIN = 'text/plain'
export const PUBLIC_CHAT_ROOM_ID = ''
// 👇 App specific dedicated bootstrap PeerIDs
// Their multiaddrs are ephemeral so peer routing is used to resolve multiaddr
export const BOOTSTRAP_PEER_IDS = [
'12D3KooWFhXabKDwALpzqMbto94sB7rvmZ6M28hs9Y9xSopDKwQr'
]

View File

@@ -0,0 +1,149 @@
import React, { createContext, useContext, useEffect, useState } from 'react'
import { useLibp2pContext } from './index.js'
import type { Logger, Message } from '@libp2p/interface'
import {
CHAT_TOPIC,
MIME_TEXT_PLAIN,
PUBSUB_PEER_DISCOVERY,
} from '../constants.js'
import { DirectMessageEvent, directMessageEvent } from '../lib/direct-message.js'
let log: Logger
export interface ChatMessage {
msgId: string
msg: string
peerId: string
read: boolean
receivedAt: number
}
export interface DirectMessages {
[peerId: string]: ChatMessage[]
}
type Chatroom = string
export interface ChatContextInterface {
messageHistory: ChatMessage[]
setMessageHistory: (messageHistory: ChatMessage[] | ((prevMessages: ChatMessage[]) => ChatMessage[])) => void
directMessages: DirectMessages
setDirectMessages: (directMessages: DirectMessages | ((prevMessages: DirectMessages) => DirectMessages)) => void
roomId: Chatroom
setRoomId: (chatRoom: Chatroom) => void
}
export const chatContext = createContext<ChatContextInterface>({
messageHistory: [],
setMessageHistory: () => {},
directMessages: {},
setDirectMessages: () => {},
roomId: '',
setRoomId: () => {}
})
export const useChatContext = () => {
return useContext(chatContext)
}
export const ChatProvider = ({ children }: any) => {
const [messageHistory, setMessageHistory] = useState<ChatMessage[]>([])
const [directMessages, setDirectMessages] = useState<DirectMessages>({})
const [roomId, setRoomId] = useState<Chatroom>('')
const { libp2p } = useLibp2pContext()
log = log ?? libp2p.logger.forComponent('chat-context')
const messageCB = (evt: CustomEvent<Message>) => {
const { topic, data } = evt.detail
switch (topic) {
case CHAT_TOPIC: {
chatMessageCB(evt, topic, data)
break
}
case PUBSUB_PEER_DISCOVERY: {
break
}
default: {
log.error('Unexpected event %o on gossipsub topic: %s', evt, topic)
}
}
}
const chatMessageCB = (evt: CustomEvent<Message>, topic: string, data: Uint8Array) => {
const msg = new TextDecoder().decode(data)
log(`${topic}: ${msg}`)
// Append signed messages, otherwise discard
if (evt.detail.type === 'signed') {
setMessageHistory([
...messageHistory,
{
msgId: crypto.randomUUID(),
msg,
peerId: evt.detail.from.toString(),
read: false,
receivedAt: Date.now(),
},
])
}
}
useEffect(() => {
const handleDirectMessage = (evt: CustomEvent<DirectMessageEvent>) => {
const peerId = evt.detail.connection.remotePeer.toString()
if (evt.detail.type !== MIME_TEXT_PLAIN) {
throw new Error(`unexpected message type: ${evt.detail.type}`)
}
const message: ChatMessage = {
msg: evt.detail.content,
read: false,
msgId: crypto.randomUUID(),
peerId: peerId,
receivedAt: Date.now(),
}
const updatedMessages = directMessages[peerId] ? [...directMessages[peerId], message] : [message]
setDirectMessages({
...directMessages,
[peerId]: updatedMessages,
})
}
libp2p.services.directMessage.addEventListener(directMessageEvent, handleDirectMessage)
return () => {
libp2p.services.directMessage.removeEventListener(directMessageEvent, handleDirectMessage)
}
}, [directMessages, libp2p.services.directMessage, setDirectMessages])
useEffect(() => {
libp2p.services.pubsub.addEventListener('message', messageCB)
return () => {
;(async () => {
// Cleanup handlers 👇
libp2p.services.pubsub.removeEventListener('message', messageCB)
})()
}
})
return (
<chatContext.Provider
value={{
roomId,
setRoomId,
messageHistory,
setMessageHistory,
directMessages,
setDirectMessages
}}
>
{children}
</chatContext.Provider>
)
}

View File

@@ -0,0 +1,71 @@
import React, { createContext, useContext, useState, useEffect, ReactNode } from 'react'
import { startLibp2p } from '../lib/libp2p.js'
import { ChatProvider } from './chat.js'
import type { Libp2p, PubSub } from '@libp2p/interface'
import type { Identify } from '@libp2p/identify'
import type { DirectMessage } from '../lib/direct-message.js'
import type { DelegatedRoutingV1HttpApiClient } from '@helia/delegated-routing-v1-http-api-client'
import { Booting } from '../components/booting.js'
export type Libp2pType = Libp2p<{
pubsub: PubSub
identify: Identify
directMessage: DirectMessage
delegatedRouting: DelegatedRoutingV1HttpApiClient
}>
export const libp2pContext = createContext<{ libp2p: Libp2pType }>({
// @ts-ignore to avoid having to check isn't undefined everywhere. Can't be undefined because children are conditionally rendered
libp2p: undefined,
})
interface WrapperProps {
children?: ReactNode
}
// This is needed to prevent libp2p from instantiating more than once
let loaded = false
export function AppWrapper({ children }: WrapperProps) {
const [libp2p, setLibp2p] = useState<Libp2pType | undefined>(undefined)
const [error, setError] = useState('')
useEffect(() => {
const init = async () => {
if (loaded) return
try {
loaded = true
const libp2p = await startLibp2p()
if (!libp2p) {
throw new Error('failed to start libp2p')
}
// @ts-ignore
globalThis.libp2p = libp2p
setLibp2p(libp2p as Libp2pType)
} catch (e) {
console.error('failed to start libp2p', e)
setError(`failed to start libp2p ${e}`)
}
}
init()
}, [])
if (!libp2p) {
return <Booting error={error} />
}
return (
<>
<libp2pContext.Provider value={{ libp2p }}>
{<ChatProvider>{children}</ChatProvider>}
</libp2pContext.Provider>
</>
)
}
export function useLibp2pContext() {
return useContext(libp2pContext)
}

View File

@@ -0,0 +1,31 @@
import { useEffect, useCallback } from 'react'
import { ChatMessage, useChatContext } from '../context/chat.js'
export const useMarkAsRead = (msgId: string, peerId: string, read: boolean, dm: boolean) => {
const { messageHistory, setMessageHistory, directMessages, setDirectMessages } = useChatContext()
const markAsRead = useCallback((messages: ChatMessage[], msgId: string): ChatMessage[] => {
return messages.map((m) => (m.msgId === msgId ? { ...m, read: true } : m))
}, [])
useEffect(() => {
if (read) {
return
}
if (dm) {
const updatedDMs = directMessages[peerId]
if (updatedDMs.some((m) => m.msgId === msgId && !m.read)) {
setDirectMessages((prev) => ({
...prev,
[peerId]: markAsRead(updatedDMs, msgId),
}))
}
} else {
if (messageHistory.some((m) => m.msgId === msgId && !m.read)) {
setMessageHistory((prev) => markAsRead(prev, msgId))
}
}
}, [dm, directMessages, messageHistory, msgId, peerId, read, setDirectMessages, setMessageHistory, markAsRead])
}

View File

@@ -0,0 +1,201 @@
import { PeerId, Stream, Connection, TypedEventEmitter, Startable } from '@libp2p/interface'
import { DIRECT_MESSAGE_PROTOCOL, MIME_TEXT_PLAIN } from '../constants.js'
import { serviceCapabilities, serviceDependencies } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal'
import type { Registrar } from '@libp2p/interface-internal'
import { dm } from '../protobuf/direct-message.js'
import { pbStream } from 'it-protobuf-stream'
export const dmClientVersion = '0.0.1'
export const directMessageEvent = 'message'
const ERRORS = {
EMPTY_MESSAGE: 'Message cannot be empty',
NO_CONNECTION: 'Failed to create connection',
NO_STREAM: 'Failed to create stream',
NO_RESPONSE: 'No response received',
NO_METADATA: 'No metadata in response',
STATUS_NOT_OK: (status: dm.Status) => `Received status: ${status}, expected OK`,
}
export interface DirectMessageEvent {
content: string
type: string
stream: Stream
connection: Connection
}
export interface DirectMessageEvents {
message: CustomEvent<DirectMessageEvent>
}
interface DirectMessageComponents {
registrar: Registrar
connectionManager: ConnectionManager
}
export class DirectMessage extends TypedEventEmitter<DirectMessageEvents> implements Startable {
readonly [serviceDependencies]: string[] = [
'@libp2p/identify',
'@libp2p/connection-encryption',
'@libp2p/transport',
'@libp2p/stream-multiplexing',
]
readonly [serviceCapabilities]: string[] = ['@universal-connectivity/direct-message']
private topologyId?: string
private readonly components: DirectMessageComponents
private dmPeers: Set<string> = new Set()
constructor(components: DirectMessageComponents) {
super()
this.components = components
}
async start(): Promise<void> {
this.topologyId = await this.components.registrar.register(DIRECT_MESSAGE_PROTOCOL, {
onConnect: this.handleConnect.bind(this),
onDisconnect: this.handleDisconnect.bind(this),
})
}
async afterStart(): Promise<void> {
await this.components.registrar.handle(DIRECT_MESSAGE_PROTOCOL, async ({ stream, connection }) => {
await this.receive(stream, connection)
})
}
stop(): void {
if (this.topologyId != null) {
this.components.registrar.unregister(this.topologyId)
}
}
private handleConnect(peerId: PeerId): void {
this.dmPeers.add(peerId.toString())
}
private handleDisconnect(peerId: PeerId): void {
this.dmPeers.delete(peerId.toString())
}
isDMPeer(peerId: PeerId): boolean {
return this.dmPeers.has(peerId.toString())
}
async send(peerId: PeerId, message: string): Promise<boolean> {
if (!message) {
throw new Error(ERRORS.EMPTY_MESSAGE)
}
let stream: Stream | undefined
try {
// openConnection will return the current open connection if it already exists, or create a new one
const conn = await this.components.connectionManager.openConnection(peerId, { signal: AbortSignal.timeout(5000) })
if (!conn) {
throw new Error(ERRORS.NO_CONNECTION)
}
// Single protocols can skip full negotiation
const stream = await conn.newStream(DIRECT_MESSAGE_PROTOCOL, {
negotiateFully: false,
})
if (!stream) {
throw new Error(ERRORS.NO_STREAM)
}
const datastream = pbStream(stream)
const req: dm.DirectMessageRequest = {
content: message,
type: MIME_TEXT_PLAIN,
metadata: {
clientVersion: dmClientVersion,
timestamp: BigInt(Date.now()),
},
}
const signal = AbortSignal.timeout(5000)
await datastream.write(req, dm.DirectMessageRequest, { signal })
const res = await datastream.read(dm.DirectMessageResponse, { signal })
if (!res) {
throw new Error(ERRORS.NO_RESPONSE)
}
if (!res.metadata) {
throw new Error(ERRORS.NO_METADATA)
}
if (res.status !== dm.Status.OK) {
throw new Error(ERRORS.STATUS_NOT_OK(res.status))
}
} catch (e: any) {
stream?.abort(e)
throw e
} finally {
try {
await stream?.close({
signal: AbortSignal.timeout(5000),
})
} catch (err: any) {
stream?.abort(err)
throw err
}
}
return true
}
async receive(stream: Stream, connection: Connection): Promise<void> {
try {
const datastream = pbStream(stream)
const signal = AbortSignal.timeout(5000)
const req = await datastream.read(dm.DirectMessageRequest, { signal })
const res: dm.DirectMessageResponse = {
status: dm.Status.OK,
metadata: {
clientVersion: dmClientVersion,
timestamp: BigInt(Date.now()),
},
}
await datastream.write(res, dm.DirectMessageResponse, { signal })
const detail: DirectMessageEvent = {
content: req.content,
type: req.type,
stream: stream,
connection: connection,
}
this.dispatchEvent(new CustomEvent(directMessageEvent, { detail }))
} catch (e: any) {
stream?.abort(e)
throw e
} finally {
try {
await stream?.close({
signal: AbortSignal.timeout(5000),
})
} catch (err: any) {
stream?.abort(err)
throw err
}
}
}
}
export function directMessage() {
return (components: DirectMessageComponents) => {
return new DirectMessage(components)
}
}

View File

@@ -0,0 +1,113 @@
import {
createDelegatedRoutingV1HttpApiClient,
} from '@helia/delegated-routing-v1-http-api-client'
import { createLibp2p } from 'libp2p'
import { identify } from '@libp2p/identify'
import { noise } from '@chainsafe/libp2p-noise'
import { yamux } from '@chainsafe/libp2p-yamux'
import { sha256 } from 'multiformats/hashes/sha2'
import type { Message, SignedMessage } from '@libp2p/interface'
import { gossipsub } from '@chainsafe/libp2p-gossipsub'
import { webSockets } from '@libp2p/websockets'
import { webRTC, webRTCDirect } from '@libp2p/webrtc'
import { circuitRelayTransport } from '@libp2p/circuit-relay-v2'
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'
import { ping } from '@libp2p/ping'
import { BOOTSTRAP_PEER_IDS, CHAT_TOPIC, PUBSUB_PEER_DISCOVERY } from '../constants.js'
import { directMessage } from './direct-message.js'
import { quic } from '@chainsafe/libp2p-quic'
import { tcp } from '@libp2p/tcp'
import { peerIdFromString } from '@libp2p/peer-id'
// message IDs are used to dedupe inbound messages
// every agent in network should use the same message id function
// messages could be perceived as duplicate if this isn't added (as opposed to
// rust peer which has unique message ids)
export async function msgIdFnStrictNoSign(msg: Message): Promise<Uint8Array> {
var enc = new TextEncoder()
const signedMessage = msg as SignedMessage
const encodedSeqNum = enc.encode(signedMessage.sequenceNumber.toString())
return await sha256.encode(encodedSeqNum)
}
export async function startLibp2p () {
const delegatedClient = createDelegatedRoutingV1HttpApiClient('https://delegated-ipfs.dev')
const node = await createLibp2p({
addresses: {
listen: [
'/webrtc-direct',
'/ip4/0.0.0.0/tcp/0',
'/ip4/0.0.0.0/udp/0/quic-v1'
]
},
transports: [
webSockets(),
webRTC(),
webRTCDirect(),
circuitRelayTransport(),
quic(),
tcp()
],
connectionEncrypters: [noise()],
streamMuxers: [yamux()],
connectionGater: {
denyDialMultiaddr: async () => false
},
peerDiscovery: [
pubsubPeerDiscovery({
interval: 10_000,
topics: [PUBSUB_PEER_DISCOVERY],
listenOnly: false
})
],
services: {
pubsub: gossipsub({
allowPublishToZeroTopicPeers: true,
msgIdFn: msgIdFnStrictNoSign,
ignoreDuplicatePublishError: true,
}),
// Delegated routing helps us discover the ephemeral multiaddrs of the
// dedicated go and rust bootstrap peers
// This relies on the public delegated routing endpoint
// See https://docs.ipfs.tech/concepts/public-utilities/#delegated-routing
delegatedRouting: () => delegatedClient,
identify: identify(),
// Custom protocol for direct messaging
directMessage: directMessage(),
ping: ping()
}
})
// subscribe to incoming chat messages
node.services.pubsub.subscribe(CHAT_TOPIC)
// find and dial the bootstrap peers
Promise.resolve().then(async () => {
for (const id of BOOTSTRAP_PEER_IDS) {
const peerId = peerIdFromString(id)
const peer = await node.peerRouting.findPeer(peerId, {
useCache: false
})
await node.dial(peer.id)
}
})
.catch(err => {
console.error('bootstrap error', err)
})
// try to dial topic peers - this is a hack to make them appear in the chat
// peer list.
//
// Note that we do not need a connection to a peer to receive its messages
// since they will be forwarded on by mesh peers. For more info see the spec:
// https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/README.md
node.services.pubsub.addEventListener('message', (evt) => {
if (evt.detail.topic === CHAT_TOPIC && evt.detail.type === 'signed') {
node.dial(evt.detail.from)
.catch(() => {})
}
})
return node
}

View File

@@ -0,0 +1,16 @@
import { PeerId } from '@libp2p/interface'
import { peerIdFromString } from '@libp2p/peer-id'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
/**
* use the last 6 chars of the peer id as a hex code to create a deterministic
* color
*/
export function peerColor (peerId: PeerId | string): string {
if (typeof peerId === 'string') {
peerId = peerIdFromString(peerId)
}
const peerIdString = uint8ArrayToString(peerId.toCID().bytes, 'base16')
return peerIdString.substring(peerIdString.length - 6).toUpperCase()
}

View File

@@ -0,0 +1,13 @@
export interface PositionProps {
x: number
y: number
height: number
width: number
}
export const layout = {
margin: 2,
bannerHeight: 3,
peerListWidth: 40,
inputHeight: 2
}

View File

@@ -0,0 +1,12 @@
import { PeerId, isPeerId } from '@libp2p/interface'
/**
* Returns the last `length` characters of the peer id
*/
export function shortPeerId (peerId: PeerId | string, length = 7): string {
if (isPeerId(peerId)) {
peerId = peerId.toString()
}
return peerId.substring(peerId.length - length)
}

View File

@@ -0,0 +1,30 @@
syntax = "proto3";
package dm;
service DirectMessage {
rpc DirectMessage (DirectMessageRequest) returns (DirectMessageResponse) {}
}
message Metadata {
string clientVersion = 1; // client version
int64 timestamp = 2; // unix time
}
enum Status {
UNKNOWN = 0;
OK = 200;
ERROR = 500;
}
message DirectMessageRequest {
Metadata metadata = 1;
string content = 2;
string type = 3;
}
message DirectMessageResponse{
Metadata metadata = 1;
Status status = 2;
optional string statusText = 3;
}

View File

@@ -0,0 +1,356 @@
/* eslint-disable import/export */
/* eslint-disable complexity */
/* eslint-disable @typescript-eslint/no-namespace */
/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */
/* eslint-disable @typescript-eslint/no-empty-interface */
import { type Codec, decodeMessage, type DecodeOptions, encodeMessage, enumeration, message } from 'protons-runtime'
import type { Uint8ArrayList } from 'uint8arraylist'
export interface dm {}
export namespace dm {
export interface DirectMessage {}
export namespace DirectMessage {
let _codec: Codec<DirectMessage>
export const codec = (): Codec<DirectMessage> => {
if (_codec == null) {
_codec = message<DirectMessage>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
default: {
reader.skipType(tag & 7)
break
}
}
}
return obj
})
}
return _codec
}
export const encode = (obj: Partial<DirectMessage>): Uint8Array => {
return encodeMessage(obj, DirectMessage.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<DirectMessage>): DirectMessage => {
return decodeMessage(buf, DirectMessage.codec(), opts)
}
}
export interface Metadata {
clientVersion: string
timestamp: bigint
}
export namespace Metadata {
let _codec: Codec<Metadata>
export const codec = (): Codec<Metadata> => {
if (_codec == null) {
_codec = message<Metadata>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if ((obj.clientVersion != null && obj.clientVersion !== '')) {
w.uint32(10)
w.string(obj.clientVersion)
}
if ((obj.timestamp != null && obj.timestamp !== 0n)) {
w.uint32(16)
w.int64(obj.timestamp)
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {
clientVersion: '',
timestamp: 0n
}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1: {
obj.clientVersion = reader.string()
break
}
case 2: {
obj.timestamp = reader.int64()
break
}
default: {
reader.skipType(tag & 7)
break
}
}
}
return obj
})
}
return _codec
}
export const encode = (obj: Partial<Metadata>): Uint8Array => {
return encodeMessage(obj, Metadata.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<Metadata>): Metadata => {
return decodeMessage(buf, Metadata.codec(), opts)
}
}
export enum Status {
UNKNOWN = 'UNKNOWN',
OK = 'OK',
ERROR = 'ERROR'
}
enum __StatusValues {
UNKNOWN = 0,
OK = 200,
ERROR = 500
}
export namespace Status {
export const codec = (): Codec<Status> => {
return enumeration<Status>(__StatusValues)
}
}
export interface DirectMessageRequest {
metadata?: dm.Metadata
content: string
type: string
}
export namespace DirectMessageRequest {
let _codec: Codec<DirectMessageRequest>
export const codec = (): Codec<DirectMessageRequest> => {
if (_codec == null) {
_codec = message<DirectMessageRequest>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if (obj.metadata != null) {
w.uint32(10)
dm.Metadata.codec().encode(obj.metadata, w)
}
if ((obj.content != null && obj.content !== '')) {
w.uint32(18)
w.string(obj.content)
}
if ((obj.type != null && obj.type !== '')) {
w.uint32(26)
w.string(obj.type)
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {
content: '',
type: ''
}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1: {
obj.metadata = dm.Metadata.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.metadata
})
break
}
case 2: {
obj.content = reader.string()
break
}
case 3: {
obj.type = reader.string()
break
}
default: {
reader.skipType(tag & 7)
break
}
}
}
return obj
})
}
return _codec
}
export const encode = (obj: Partial<DirectMessageRequest>): Uint8Array => {
return encodeMessage(obj, DirectMessageRequest.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<DirectMessageRequest>): DirectMessageRequest => {
return decodeMessage(buf, DirectMessageRequest.codec(), opts)
}
}
export interface DirectMessageResponse {
metadata?: dm.Metadata
status: dm.Status
statusText?: string
}
export namespace DirectMessageResponse {
let _codec: Codec<DirectMessageResponse>
export const codec = (): Codec<DirectMessageResponse> => {
if (_codec == null) {
_codec = message<DirectMessageResponse>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if (obj.metadata != null) {
w.uint32(10)
dm.Metadata.codec().encode(obj.metadata, w)
}
if (obj.status != null && __StatusValues[obj.status] !== 0) {
w.uint32(16)
dm.Status.codec().encode(obj.status, w)
}
if (obj.statusText != null) {
w.uint32(26)
w.string(obj.statusText)
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {
status: Status.UNKNOWN
}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1: {
obj.metadata = dm.Metadata.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.metadata
})
break
}
case 2: {
obj.status = dm.Status.codec().decode(reader)
break
}
case 3: {
obj.statusText = reader.string()
break
}
default: {
reader.skipType(tag & 7)
break
}
}
}
return obj
})
}
return _codec
}
export const encode = (obj: Partial<DirectMessageResponse>): Uint8Array => {
return encodeMessage(obj, DirectMessageResponse.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<DirectMessageResponse>): DirectMessageResponse => {
return decodeMessage(buf, DirectMessageResponse.codec(), opts)
}
}
let _codec: Codec<dm>
export const codec = (): Codec<dm> => {
if (_codec == null) {
_codec = message<dm>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
default: {
reader.skipType(tag & 7)
break
}
}
}
return obj
})
}
return _codec
}
export const encode = (obj: Partial<dm>): Uint8Array => {
return encodeMessage(obj, dm.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<dm>): dm => {
return decodeMessage(buf, dm.codec(), opts)
}
}