feat: direct messages (#175)

* feat: direct messages (#5)

* chore: use Libp2pType

* feat: add direct messaging as a custom service

* chore: move init of directmessage handler

* fix: add await to dm receive

* chore: small simplification (#6)

* chore: small simplification

* chore: remove from field which can be derived

---------

Co-authored-by: Daniel N <2color@users.noreply.github.com>

* chore: signals

* chore: remove menu, add popover

* chore: move markAsRead to hook

* chore: wrap dm receive in try catch

---------

Co-authored-by: Daniel Norman <1992255+2color@users.noreply.github.com>
Co-authored-by: Daniel N <2color@users.noreply.github.com>
This commit is contained in:
dozyio
2024-09-17 13:04:24 +01:00
committed by GitHub
parent 21cb6bb27b
commit 750150c0e4
18 changed files with 2655 additions and 1076 deletions

View File

@@ -1,3 +1,7 @@
{
"extends": "next/core-web-vitals"
"extends": "next/core-web-vitals",
"rules": {
"@next/next/no-img-element": "off"
},
"ignorePatterns": ["node_modules/", "build/", ".next/", "src/lib/protobuf/"]
}

2
js-peer/.gitignore vendored
View File

@@ -34,3 +34,5 @@ yarn-error.log*
# typescript
*.tsbuildinfo
next-env.d.ts
certificates

2583
js-peer/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,10 +1,12 @@
{
"name": "universal-connectivity-browser",
"scripts": {
"dev": "next dev",
"dev": "next dev --experimental-https",
"build": "next build",
"start": "next start",
"lint": "next lint"
"lint": "next lint",
"tsc": "tsc --noEmit -p tsconfig.json",
"protobuf": "protons src/lib/protobuf/*.proto"
},
"dependencies": {
"@chainsafe/libp2p-gossipsub": "^13.0.0",
@@ -28,8 +30,10 @@
"it-length-prefixed": "^9.0.4",
"it-map": "^3.1.0",
"it-pipe": "^3.0.1",
"it-protobuf-stream": "^1.1.4",
"libp2p": "^1.6.1",
"next": "14.2.3",
"protons-runtime": "^5.4.0",
"react": "18.3.1",
"react-18-blockies": "^1.0.6",
"react-dom": "18.3.1",
@@ -46,6 +50,7 @@
"eslint": "8.57.0",
"eslint-config-next": "14.2.3",
"postcss": "^8.4.38",
"protons": "^7.5.0",
"tailwindcss": "^3.4.3",
"typescript": "5.4.5"
}

View File

@@ -2,7 +2,7 @@ import { useLibp2pContext } from '@/context/ctx'
import { CHAT_TOPIC } from '@/lib/constants'
import React, { useEffect, useState } from 'react'
import type { PeerId } from '@libp2p/interface'
import Blockies from 'react-18-blockies'
import { PeerWrapper } from './peer'
export function ChatPeerList() {
const { libp2p } = useLibp2pContext()
@@ -10,7 +10,7 @@ export function ChatPeerList() {
useEffect(() => {
const onSubscriptionChange = () => {
const subscribers = libp2p.services.pubsub.getSubscribers(CHAT_TOPIC)
const subscribers = libp2p.services.pubsub.getSubscribers(CHAT_TOPIC) as PeerId[]
setSubscribers(subscribers)
}
onSubscriptionChange()
@@ -23,28 +23,16 @@ export function ChatPeerList() {
return (
<div className="border-l border-gray-300 lg:col-span-1">
<h2 className="my-2 mb-2 ml-2 text-lg text-gray-600">Peers</h2>
<ul className="overflow-auto h-[32rem]">
{<Peer key={libp2p.peerId.toString()} peer={libp2p.peerId} self />}
<div className="overflow-auto h-[32rem]">
<div className="px-3 py-2 border-b border-gray-300 focus:outline-none">
{<PeerWrapper peer={libp2p.peerId} self withName={true} withUnread={false} />}
</div>
{subscribers.map((p) => (
<Peer key={p.toString()} peer={p} self={false} />
<div key={p.toString()} className="px-3 py-2 border-b border-gray-300 focus:outline-none">
<PeerWrapper peer={p} self={false} withName={true} withUnread={true} />
</div>
))}
</ul>
</div>
</div>
)
}
function Peer({ peer, self }: { peer: PeerId; self: boolean }) {
return (
<li className="flex items-center px-3 py-2 text-sm transition duration-150 ease-in-out border-b border-gray-300 focus:outline-none">
<Blockies seed={peer.toString()} size={15} scale={3} className="rounded max-h-10 max-w-10" />
<div className="w-full pb-2">
<div className="flex justify-between">
<span className={`block ml-2 font-semibold ${self ? 'text-indigo-700-600' : 'text-gray-600'}`}>
{peer.toString().slice(-7)}
{self && ' (You)'}
</span>
</div>
</div>
</li>
)
}

View File

@@ -1,21 +1,30 @@
import { useLibp2pContext } from '@/context/ctx'
import React, { useCallback, useEffect, useRef, useState } from 'react'
import { CHAT_FILE_TOPIC, CHAT_TOPIC, FILE_EXCHANGE_PROTOCOL } from '@/lib/constants'
import { CHAT_FILE_TOPIC, CHAT_TOPIC } from '@/lib/constants'
import { ChatFile, ChatMessage, useChatContext } from '../context/chat-ctx'
import { v4 as uuidv4 } from 'uuid'
import { MessageComponent } from './message'
import { Message } from './message'
import { forComponent } from '@/lib/logger'
import { ChatPeerList } from './chat-peer-list'
import { ChevronLeftIcon } from '@heroicons/react/20/solid'
import Blockies from 'react-18-blockies'
import { peerIdFromString } from '@libp2p/peer-id'
const log = forComponent('chat')
export const PUBLIC_CHAT_ROOM_ID = ''
const PUBLIC_CHAT_ROOM_NAME = 'Public Chat'
export default function ChatContainer() {
const { libp2p } = useLibp2pContext()
const { messageHistory, setMessageHistory, files, setFiles } = useChatContext()
const { roomId, setRoomId } = useChatContext()
const { messageHistory, setMessageHistory, directMessages, setDirectMessages, files, setFiles } = useChatContext()
const [input, setInput] = useState<string>('')
const fileRef = useRef<HTMLInputElement>(null)
const [messages, setMessages] = useState<ChatMessage[]>([])
const sendMessage = useCallback(async () => {
// Send message to public chat over gossipsub
const sendPublicMessage = useCallback(async () => {
if (input === '') return
log(
@@ -33,11 +42,56 @@ export default function ChatContainer() {
setMessageHistory([
...messageHistory,
{ msg: input, fileObjectUrl: undefined, from: 'me', peerId: myPeerId },
{
msgId: crypto.randomUUID(),
msg: input,
fileObjectUrl: undefined,
peerId: myPeerId,
read: true,
receivedAt: Date.now(),
},
])
setInput('')
}, [input, messageHistory, setInput, libp2p, setMessageHistory])
// Send direct message over custom protocol
const sendDirectMessage = useCallback(async () => {
try {
const res = await libp2p.services.directMessage.send(peerIdFromString(roomId), input)
if (!res) {
log('Failed to send message')
return
}
const myPeerId = libp2p.peerId.toString()
const newMessage: ChatMessage = {
msgId: crypto.randomUUID(),
msg: input,
fileObjectUrl: undefined,
peerId: myPeerId,
read: true,
receivedAt: Date.now(),
}
const updatedMessages = directMessages[roomId]
? [...directMessages[roomId], newMessage]
: [newMessage]
setDirectMessages({
...directMessages,
[roomId]: updatedMessages,
})
setInput('')
} catch (e: any) {
log(e)
}
}, [libp2p, setDirectMessages, directMessages, roomId, input])
const sendFile = useCallback(
async (readerEvent: ProgressEvent<FileReader>) => {
const fileBody = readerEvent.target?.result as ArrayBuffer
@@ -62,10 +116,12 @@ export default function ChatContainer() {
)
const msg: ChatMessage = {
msgId: crypto.randomUUID(),
msg: newChatFileMessage(file.id, file.body),
fileObjectUrl: window.URL.createObjectURL(new Blob([file.body])),
from: 'me',
peerId: myPeerId,
read: true,
receivedAt: Date.now(),
}
setMessageHistory([...messageHistory, msg])
},
@@ -81,16 +137,24 @@ export default function ChatContainer() {
if (e.key !== 'Enter') {
return
}
sendMessage()
if (roomId === PUBLIC_CHAT_ROOM_ID) {
sendPublicMessage()
} else {
sendDirectMessage()
}
},
[sendMessage],
[sendPublicMessage, sendDirectMessage, roomId],
)
const handleSend = useCallback(
async (e: React.MouseEvent<HTMLButtonElement>) => {
sendMessage()
if (roomId === PUBLIC_CHAT_ROOM_ID) {
sendPublicMessage()
} else {
sendDirectMessage()
}
},
[sendMessage],
[sendPublicMessage, sendDirectMessage, roomId],
)
const handleInput = useCallback(
@@ -120,33 +184,91 @@ export default function ChatContainer() {
[fileRef],
)
const handleBackToPublic = () => {
setRoomId(PUBLIC_CHAT_ROOM_ID)
setMessages(messageHistory)
}
useEffect(() => {
// assumes a chat room is a peerId thus a direct message
if (roomId === PUBLIC_CHAT_ROOM_ID) {
setMessages(messageHistory)
} else {
setMessages(directMessages[roomId] || [])
}
}, [roomId, directMessages, messageHistory])
return (
<div className="container mx-auto">
<div className="min-w-full border rounded lg:grid lg:grid-cols-6">
<div className="lg:col-span-5 lg:block">
<div className="w-full">
<div className="relative flex items-center p-3 border-b border-gray-300">
<span className="block ml-2 font-bold text-gray-600">Public Chat</span>
{roomId === PUBLIC_CHAT_ROOM_ID &&
<span className="block ml-2 font-bold text-gray-600">{PUBLIC_CHAT_ROOM_NAME}</span>
}
{roomId !== PUBLIC_CHAT_ROOM_ID && (
<>
<Blockies
seed={roomId}
size={8}
scale={3}
className="rounded mr-2 max-h-10 max-w-10"
/>
<span className={`text-gray-500 flex`}>
{roomId.toString().slice(-7)}
</span>
<button
onClick={handleBackToPublic}
className="text-gray-500 flex ml-auto"
>
<ChevronLeftIcon className="w-6 h-6 text-gray-500" />
<span>Back to Public Chat</span>
</button>
</>
)}
</div>
<div className="relative w-full flex flex-col-reverse p-3 overflow-y-auto h-[40rem] bg-gray-100">
<ul className="space-y-2">
{/* messages start */}
{messageHistory.map(({ msg, fileObjectUrl, from, peerId }, idx) => (
<MessageComponent
key={idx}
msg={msg}
fileObjectUrl={fileObjectUrl}
from={from}
peerId={peerId}
/>
))}
{/* messages end */}
{messages.map(
({
msgId,
msg,
fileObjectUrl,
peerId,
read,
receivedAt,
}: ChatMessage) => (
<Message
key={msgId}
dm={roomId !== ''}
msg={msg}
fileObjectUrl={fileObjectUrl}
peerId={peerId}
read={read}
msgId={msgId}
receivedAt={receivedAt}
/>
),
)}
</ul>
</div>
<div className="flex items-center justify-between w-full p-3 border-t border-gray-300">
<input ref={fileRef} className="hidden" type="file" onChange={handleFileInput} />
<button onClick={handleFileSend}>
<input
ref={fileRef}
className="hidden"
type="file"
onChange={handleFileInput}
disabled={roomId !== PUBLIC_CHAT_ROOM_ID}
/>
<button
onClick={handleFileSend}
disabled={roomId !== PUBLIC_CHAT_ROOM_ID}
title={roomId === PUBLIC_CHAT_ROOM_ID ? 'Upload file' : "Unsupported in DM's" }
className={roomId === PUBLIC_CHAT_ROOM_ID ? '' : 'cursor-not-allowed'}
>
<svg
xmlns="http://www.w3.org/2000/svg"
className="w-5 h-5 text-gray-500"

View File

@@ -1,16 +1,32 @@
import React, { useEffect } from 'react'
import { useLibp2pContext } from '@/context/ctx'
import React, { useCallback, useEffect, useRef, useState } from 'react'
import { ChatMessage } from '@/context/chat-ctx'
import Blockies from 'react-18-blockies'
import { ChatMessage, useChatContext } from '@/context/chat-ctx'
import { PeerWrapper } from './peer'
import { peerIdFromString } from '@libp2p/peer-id'
import { useMarkAsRead } from '@/hooks/useMarkAsRead'
interface MessageProps extends ChatMessage {}
interface Props extends ChatMessage {
dm: boolean
}
export function MessageComponent({ msg, fileObjectUrl, from, peerId }: MessageProps) {
export const Message = ({ msgId, msg, fileObjectUrl, peerId, read, dm, receivedAt }: Props) => {
const { libp2p } = useLibp2pContext()
const isSelf: boolean = libp2p.peerId.equals(peerId)
const timestamp = new Date(receivedAt).toLocaleString()
useMarkAsRead(msgId, peerId, read, dm)
return (
<li className={`flex ${from === 'me' && 'flex-row-reverse'} gap-2`}>
<Blockies seed={peerId} size={15} scale={3} className="rounded max-h-10 max-w-10" />
<li className={`flex ${isSelf && 'flex-row-reverse'} gap-2`}>
<PeerWrapper
key={peerId}
peer={peerIdFromString(peerId)}
self={isSelf}
withName={false}
withUnread={false}
/>
<div className="flex relative max-w-xl px-4 py-2 text-gray-700 rounded shadow bg-white">
<div className="block">
{msg}
@@ -24,8 +40,9 @@ export function MessageComponent({ msg, fileObjectUrl, from, peerId }: MessagePr
)}
</p>
<p className="italic text-gray-400">
{peerId !== libp2p.peerId.toString() ? `from: ${peerId.slice(-4)}` : null}{' '}
{!dm && peerId !== libp2p.peerId.toString() ? `from: ${peerId.slice(-4)}` : null}{' '}
</p>
<span className="relative pl-1 text-xs text-slate-400">{timestamp}</span>
</div>
</div>
</li>

View File

@@ -0,0 +1,83 @@
import { useLibp2pContext } from "@/context/ctx"
import { useEffect, useState } from "react"
import { PeerId } from '@libp2p/interface'
import { useChatContext } from "@/context/chat-ctx"
import Blockies from 'react-18-blockies'
export interface PeerProps {
peer: PeerId,
self: boolean,
withName: boolean,
withUnread: boolean
}
export function PeerWrapper({ peer, self, withName, withUnread }: PeerProps) {
const { libp2p } = useLibp2pContext()
const [identified, setIdentified] = useState(false)
const { setRoomId } = useChatContext()
const handleSetRoomId = () => {
setRoomId(peer.toString())
}
useEffect(() => {
const init = async () => {
if (await libp2p.peerStore.has(peer)) {
const p = await libp2p.peerStore.get(peer)
if (p.protocols.length > 0) {
setIdentified(true)
}
}
}
init()
}, [libp2p.peerStore, peer])
if (self || !identified) {
return (
<Peer peer={peer} self={self} withName={withName} withUnread={withUnread} />
)
}
if (identified && libp2p.services.directMessage.isDMPeer(peer)) {
return (
<div className="relative inline-block text-left cursor-pointer" onClick={() => handleSetRoomId()}>
<Peer peer={peer} self={self} withName={withName} withUnread={withUnread} />
</div>
)
}
if (identified && !libp2p.services.directMessage.isDMPeer(peer)) {
return (
<div className="relative inline-block text-left group">
<Peer peer={peer} self={self} withName={withName} withUnread={withUnread} />
<div className="absolute top-10 left-5 scale-0 rounded bg-white border text-gray-600 p-2 text-xs group-hover:scale-100 z-10">Direct{'\u00A0'}message unsupported</div>
</div>
)
}
}
export function Peer({ peer, self, withName, withUnread }: PeerProps) {
const { directMessages } = useChatContext()
return (
<div className="flex items-stretch text-sm transition duration-150 ease-in-out focus:outline-none relative text-left">
<Blockies seed={peer.toString()} size={15} scale={3} className="rounded max-h-10 max-w-10" />
{withName &&
<div className="w-full">
<div className="flex justify-between">
<span className={`block ml-2 font-semibold ${self ? 'text-indigo-700-600' : 'text-gray-600'}`}>
{peer.toString().slice(-7)}
{self && ' (You)'}
</span>
</div>
{withUnread && (
<div className="ml-2 text-gray-600">
{directMessages[peer.toString()]?.filter((m) => !m.read).length ? `(${directMessages[peer.toString()]?.filter((m) => !m.read).length} unread)` : ''}
</div>
)}
</div>
}
</div>
)
}

View File

@@ -1,22 +1,24 @@
import React, { createContext, useContext, useEffect, useState } from 'react';
import { useLibp2pContext } from './ctx';
import type { Message } from '@libp2p/interface'
import { CHAT_FILE_TOPIC, CHAT_TOPIC, FILE_EXCHANGE_PROTOCOL, PUBSUB_PEER_DISCOVERY } from '@/lib/constants'
import { CHAT_FILE_TOPIC, CHAT_TOPIC, FILE_EXCHANGE_PROTOCOL, MIME_TEXT_PLAIN, PUBSUB_PEER_DISCOVERY } from '@/lib/constants'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { pipe } from 'it-pipe'
import map from 'it-map'
import * as lp from 'it-length-prefixed'
import { forComponent } from '@/lib/logger';
import { forComponent } from '@/lib/logger'
import { DirectMessageEvent, directMessageEvent } from '@/lib/direct-message'
const log = forComponent('chat-context')
export interface ChatMessage {
msg: string
fileObjectUrl: string | undefined
from: 'me' | 'other'
peerId: string
msgId: string
msg: string
fileObjectUrl: string | undefined
peerId: string
read: boolean
receivedAt: number
}
export interface ChatFile {
@@ -25,26 +27,52 @@ export interface ChatFile {
sender: string
}
export interface ChatContextInterface {
messageHistory: ChatMessage[];
setMessageHistory: (messageHistory: ChatMessage[]) => void;
files: Map<string, ChatFile>
setFiles: (files: Map<string, ChatFile>) => void;
export interface DirectMessages {
[peerId: string]: ChatMessage[]
}
type Chatroom = string
export interface ChatContextInterface {
messageHistory: ChatMessage[]
setMessageHistory: (
messageHistory:
| ChatMessage[]
| ((prevMessages: ChatMessage[]) => ChatMessage[]),
) => void
directMessages: DirectMessages
setDirectMessages: (
directMessages:
| DirectMessages
| ((prevMessages: DirectMessages) => DirectMessages),
) => void
roomId: Chatroom
setRoomId: (chatRoom: Chatroom) => void
files: Map<string, ChatFile>
setFiles: (files: Map<string, ChatFile>) => void
}
export const chatContext = createContext<ChatContextInterface>({
messageHistory: [],
messageHistory: [],
setMessageHistory: () => {},
directMessages: {},
setDirectMessages: () => {},
roomId: '',
setRoomId: () => {},
files: new Map<string, ChatFile>(),
setMessageHistory: () => { },
setFiles: () => { }
setFiles: () => {},
})
export const useChatContext = () => {
return useContext(chatContext);
};
return useContext(chatContext)
}
export const ChatProvider = ({ children }: any) => {
const [messageHistory, setMessageHistory] = useState<ChatMessage[]>([]);
const [messageHistory, setMessageHistory] = useState<ChatMessage[]>([]);
const [directMessages, setDirectMessages] = useState<DirectMessages>({})
const [files, setFiles] = useState<Map<string, ChatFile>>(new Map<string, ChatFile>());
const [roomId, setRoomId] = useState<Chatroom>('')
const { libp2p } = useLibp2pContext()
const messageCB = (evt: CustomEvent<Message>) => {
@@ -75,7 +103,17 @@ export const ChatProvider = ({ children }: any) => {
// Append signed messages, otherwise discard
if (evt.detail.type === 'signed') {
setMessageHistory([...messageHistory, { msg, fileObjectUrl: undefined, from: 'other', peerId: evt.detail.from.toString() }])
setMessageHistory([
...messageHistory,
{
msgId: crypto.randomUUID(),
msg,
fileObjectUrl: undefined,
peerId: evt.detail.from.toString(),
read: false,
receivedAt: Date.now(),
},
])
}
}
@@ -104,10 +142,12 @@ export const ChatProvider = ({ children }: any) => {
log(`chat file message request_response: response received: size:${body.length}`)
const msg: ChatMessage = {
msgId: crypto.randomUUID(),
msg: newChatFileMessage(fileId, body),
fileObjectUrl: window.URL.createObjectURL(new Blob([body])),
from: 'other',
peerId: senderPeerId.toString(),
read: false,
receivedAt: Date.now(),
}
setMessageHistory([...messageHistory, msg])
}
@@ -118,6 +158,40 @@ export const ChatProvider = ({ children }: any) => {
}
}
useEffect(() => {
const handleDirectMessage = (evt: CustomEvent<DirectMessageEvent>) => {
const peerId = evt.detail.connection.remotePeer.toString()
if (evt.detail.type !== MIME_TEXT_PLAIN) {
throw new Error(`unexpected message type: ${evt.detail.type}`)
}
const message: ChatMessage = {
msg: evt.detail.content,
read: false,
msgId: crypto.randomUUID(),
fileObjectUrl: undefined,
peerId: peerId,
receivedAt: Date.now(),
}
const updatedMessages = directMessages[peerId]
? [...directMessages[peerId], message]
: [message]
setDirectMessages({
...directMessages,
[peerId]: updatedMessages,
})
}
libp2p.services.directMessage.addEventListener(directMessageEvent, handleDirectMessage)
return () => {
libp2p.services.directMessage.removeEventListener(directMessageEvent, handleDirectMessage)
}
}, [directMessages, libp2p.services.directMessage, setDirectMessages])
useEffect(() => {
libp2p.services.pubsub.addEventListener('message', messageCB)
@@ -144,11 +218,19 @@ export const ChatProvider = ({ children }: any) => {
}
})
return (
<chatContext.Provider value={{ messageHistory, setMessageHistory, files, setFiles }}>
{children}
</chatContext.Provider>
);
return (
<chatContext.Provider
value={{
roomId,
setRoomId,
messageHistory,
setMessageHistory,
directMessages,
setDirectMessages,
files,
setFiles
}}>
{children}
</chatContext.Provider>
);
};

View File

@@ -1,13 +1,18 @@
import React, { createContext, useContext, useState, useEffect, ReactNode } from 'react'
import type { Libp2p } from 'libp2p'
import { startLibp2p } from '../lib/libp2p'
import { ChatProvider } from './chat-ctx'
import { PubSub } from '@libp2p/interface'
import { Identify } from '@libp2p/identify'
import type { Libp2p, PubSub } from '@libp2p/interface'
import type { Identify } from '@libp2p/identify'
import type { DirectMessage } from '@/lib/direct-message'
import type { DelegatedRoutingV1HttpApiClient } from '@helia/delegated-routing-v1-http-api-client'
import { Booting } from '@/components/booting'
type Libp2pType = Libp2p<{ pubsub: PubSub; identify: Identify }>
export type Libp2pType = Libp2p<{
pubsub: PubSub;
identify: Identify;
directMessage: DirectMessage;
delegatedRouting: DelegatedRoutingV1HttpApiClient;
}>
export const libp2pContext = createContext<{ libp2p: Libp2pType }>({
// @ts-ignore to avoid having to check isn't undefined everywhere. Can't be undefined because children are conditionally rendered
@@ -21,7 +26,7 @@ interface WrapperProps {
// This is needed to prevent libp2p from instantiating more than once
let loaded = false
export function AppWrapper({ children }: WrapperProps) {
const [libp2p, setLibp2p] = useState<Libp2pType>()
const [libp2p, setLibp2p] = useState<Libp2pType | undefined>(undefined)
const [error, setError] = useState('')
useEffect(() => {
@@ -31,10 +36,14 @@ export function AppWrapper({ children }: WrapperProps) {
loaded = true
const libp2p = await startLibp2p()
if (!libp2p) {
throw new Error('failed to start libp2p')
}
// @ts-ignore
window.libp2p = libp2p
setLibp2p(libp2p)
setLibp2p(libp2p as Libp2pType)
} catch (e) {
console.error('failed to start libp2p', e)
setError(`failed to start libp2p ${e}`)

View File

@@ -0,0 +1,31 @@
import { useEffect, useCallback } from 'react'
import { ChatMessage, useChatContext } from '@/context/chat-ctx'
export const useMarkAsRead = (msgId: string, peerId: string, read: boolean, dm: boolean) => {
const { messageHistory, setMessageHistory, directMessages, setDirectMessages } = useChatContext()
const markAsRead = useCallback((messages: ChatMessage[], msgId: string): ChatMessage[] => {
return messages.map((m) => (m.msgId === msgId ? { ...m, read: true } : m))
}, [])
useEffect(() => {
if (read) {
return
}
if (dm) {
const updatedDMs = directMessages[peerId]
if (updatedDMs.some((m) => m.msgId === msgId && !m.read)) {
setDirectMessages((prev) => ({
...prev,
[peerId]: markAsRead(updatedDMs, msgId),
}))
}
} else {
if (messageHistory.some((m) => m.msgId === msgId && !m.read)) {
setMessageHistory((prev) => markAsRead(prev, msgId))
}
}
}, [dm, directMessages, messageHistory, msgId, peerId, read, setDirectMessages, setMessageHistory, markAsRead])
}

View File

@@ -0,0 +1,4 @@
export function classNames(...classes: string[]) {
return classes.filter(Boolean).join(' ')
}

View File

@@ -2,13 +2,15 @@ export const CHAT_TOPIC = "universal-connectivity"
export const CHAT_FILE_TOPIC = "universal-connectivity-file"
export const PUBSUB_PEER_DISCOVERY = "universal-connectivity-browser-peer-discovery"
export const FILE_EXCHANGE_PROTOCOL = "/universal-connectivity-file/1"
export const DIRECT_MESSAGE_PROTOCOL = "/universal-connectivity/dm/1.0.0"
export const CIRCUIT_RELAY_CODE = 290
export const MIME_TEXT_PLAIN = 'text/plain'
// 👇 App specific dedicated bootstrap PeerIDs
// Their multiaddrs are ephemeral so peer routing is used to resolve multiaddr
export const WEBRTC_BOOTSTRAP_PEER_ID = "12D3KooWGahRw3ZnM4gAyd9FK75v4Bp5keFYTvkcAwhpEm28wbV3"
export const WEBTRANSPORT_BOOTSTRAP_PEER_ID = "12D3KooWFhXabKDwALpzqMbto94sB7rvmZ6M28hs9Y9xSopDKwQr"
export const BOOTSTRAP_PEER_IDS = [WEBTRANSPORT_BOOTSTRAP_PEER_ID, WEBRTC_BOOTSTRAP_PEER_ID]
export const BOOTSTRAP_PEER_IDS = [WEBTRANSPORT_BOOTSTRAP_PEER_ID, WEBRTC_BOOTSTRAP_PEER_ID]

View File

@@ -0,0 +1,206 @@
import { PeerId, Stream, Connection, CustomEvent, TypedEventEmitter, Startable } from '@libp2p/interface'
import { DIRECT_MESSAGE_PROTOCOL, MIME_TEXT_PLAIN } from '@/lib/constants'
import { serviceCapabilities, serviceDependencies } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal'
import type { Registrar } from '@libp2p/interface-internal'
import { dm } from '@/lib/protobuf/direct-message'
import { pbStream } from 'it-protobuf-stream'
export const dmClientVersion = '0.0.1'
export const directMessageEvent = 'message'
const ERRORS = {
EMPTY_MESSAGE: 'Message cannot be empty',
NO_CONNECTION: 'Failed to create connection',
NO_STREAM: 'Failed to create stream',
NO_RESPONSE: 'No response received',
NO_METADATA: 'No metadata in response',
STATUS_NOT_OK: (status: dm.Status) => `Received status: ${status}, expected OK`
}
export interface DirectMessageEvent {
content: string
type: string
stream: Stream
connection: Connection
}
export interface DirectMessageEvents {
'message': CustomEvent<DirectMessageEvent>
}
interface DirectMessageComponents {
registrar: Registrar
connectionManager: ConnectionManager
}
export class DirectMessage extends TypedEventEmitter<DirectMessageEvents> implements Startable {
readonly [serviceDependencies]: string[] = [
'@libp2p/identify',
'@libp2p/connection-encryption',
'@libp2p/transport',
'@libp2p/stream-multiplexing',
]
readonly [serviceCapabilities]: string[] = [
'@universal-connectivity/direct-message'
]
private topologyId?: string
private readonly components: DirectMessageComponents
private dmPeers: Set<string> = new Set()
constructor(components: DirectMessageComponents) {
super()
this.components = components
}
async start(): Promise<void> {
this.topologyId = await this.components.registrar.register(DIRECT_MESSAGE_PROTOCOL, {
onConnect: this.handleConnect.bind(this),
onDisconnect: this.handleDisconnect.bind(this)
})
}
async afterStart(): Promise<void> {
await this.components.registrar.handle(
DIRECT_MESSAGE_PROTOCOL,
async ({ stream, connection }) => {
await this.receive(stream, connection)
},
)
}
stop(): void {
if (this.topologyId != null) {
this.components.registrar.unregister(this.topologyId)
}
}
private handleConnect(peerId: PeerId): void {
this.dmPeers.add(peerId.toString())
}
private handleDisconnect(peerId: PeerId): void {
this.dmPeers.delete(peerId.toString())
}
isDMPeer(peerId: PeerId): boolean {
return this.dmPeers.has(peerId.toString())
}
async send(peerId: PeerId, message: string): Promise<boolean> {
if (!message) {
throw new Error(ERRORS.EMPTY_MESSAGE)
}
let stream: Stream | undefined
try {
// openConnection will return the current open connection if it already exists, or create a new one
const conn = await this.components.connectionManager.openConnection(peerId, { signal: AbortSignal.timeout(5000) })
if (!conn) {
throw new Error(ERRORS.NO_CONNECTION)
}
// Single protocols can skip full negotiation
const stream = await conn.newStream(DIRECT_MESSAGE_PROTOCOL, { negotiateFully: false })
if (!stream) {
throw new Error(ERRORS.NO_STREAM)
}
const datastream = pbStream(stream)
const req: dm.DirectMessageRequest = {
content: message,
type: MIME_TEXT_PLAIN,
metadata: {
clientVersion: dmClientVersion,
timestamp: BigInt(Date.now()),
},
}
const signal = AbortSignal.timeout(5000)
await datastream.write(req, dm.DirectMessageRequest, { signal })
const res = await datastream.read(dm.DirectMessageResponse, { signal })
if (!res) {
throw new Error(ERRORS.NO_RESPONSE)
}
if (!res.metadata) {
throw new Error(ERRORS.NO_METADATA)
}
if (res.status !== dm.Status.OK) {
throw new Error(ERRORS.STATUS_NOT_OK(res.status))
}
} catch (e: any) {
stream?.abort(e)
throw e
} finally {
try {
await stream?.close({
signal: AbortSignal.timeout(5000)
})
} catch (err: any) {
stream?.abort(err)
throw err
}
}
return true
}
async receive(stream: Stream, connection: Connection): Promise<void> {
try {
const datastream = pbStream(stream)
const signal = AbortSignal.timeout(5000)
const req = await datastream.read(dm.DirectMessageRequest, { signal })
const res: dm.DirectMessageResponse = {
status: dm.Status.OK,
metadata: {
clientVersion: dmClientVersion,
timestamp: BigInt(Date.now()),
},
}
await datastream.write(res, dm.DirectMessageResponse, { signal })
const detail: DirectMessageEvent = {
content: req.content,
type: req.type,
stream: stream,
connection: connection
}
this.dispatchEvent(
new CustomEvent(directMessageEvent, { detail })
)
} catch (e: any) {
stream?.abort(e)
throw e
} finally {
try {
await stream?.close({
signal: AbortSignal.timeout(5000),
})
} catch (err: any) {
stream?.abort(err)
throw err
}
}
}
}
export function directMessage() {
return (components: DirectMessageComponents) => {
return new DirectMessage(components)
}
}

View File

@@ -2,7 +2,7 @@ import {
createDelegatedRoutingV1HttpApiClient,
DelegatedRoutingV1HttpApiClient,
} from '@helia/delegated-routing-v1-http-api-client'
import { createLibp2p, Libp2p } from 'libp2p'
import { createLibp2p } from 'libp2p'
import { identify } from '@libp2p/identify'
import { peerIdFromString } from '@libp2p/peer-id'
import { noise } from '@chainsafe/libp2p-noise'
@@ -10,7 +10,7 @@ import { yamux } from '@chainsafe/libp2p-yamux'
import { bootstrap } from '@libp2p/bootstrap'
import { Multiaddr } from '@multiformats/multiaddr'
import { sha256 } from 'multiformats/hashes/sha2'
import type { Connection, Message, SignedMessage, PeerId } from '@libp2p/interface'
import type { Connection, Message, SignedMessage, PeerId, Libp2p } from '@libp2p/interface'
import { gossipsub } from '@chainsafe/libp2p-gossipsub'
import { webSockets } from '@libp2p/websockets'
import { webTransport } from '@libp2p/webtransport'
@@ -20,18 +20,23 @@ import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'
import { BOOTSTRAP_PEER_IDS, CHAT_FILE_TOPIC, CHAT_TOPIC, PUBSUB_PEER_DISCOVERY } from './constants'
import first from 'it-first'
import { forComponent } from './logger'
import { directMessage } from './direct-message'
import type { Libp2pType } from '@/context/ctx'
const log = forComponent('libp2p')
export async function startLibp2p() {
export async function startLibp2p(): Promise<Libp2pType> {
// enable verbose logging in browser console to view debug logs
localStorage.debug = 'ui*,libp2p*,-libp2p:connection-manager*,-*:trace'
const delegatedClient = createDelegatedRoutingV1HttpApiClient('https://delegated-ipfs.dev')
const { bootstrapAddrs, relayListenAddrs } = await getBootstrapMultiaddrs(delegatedClient)
log('starting libp2p with bootstrapAddrs %o and relayListenAddrs: %o', bootstrapAddrs, relayListenAddrs)
const libp2p = await createLibp2p({
let libp2p: Libp2pType
libp2p = await createLibp2p({
addresses: {
listen: [
// 👇 Listen for webRTC connection
@@ -92,9 +97,15 @@ export async function startLibp2p() {
// This relies on the public delegated routing endpoint https://docs.ipfs.tech/concepts/public-utilities/#delegated-routing
delegatedRouting: () => delegatedClient,
identify: identify(),
// Custom protocol for direct messaging
directMessage: directMessage(),
},
})
if (!libp2p) {
throw new Error('Failed to create libp2p node')
}
libp2p.services.pubsub.subscribe(CHAT_TOPIC)
libp2p.services.pubsub.subscribe(CHAT_FILE_TOPIC)

View File

@@ -0,0 +1,30 @@
syntax = "proto3";
package dm;
service DirectMessage {
rpc DirectMessage (DirectMessageRequest) returns (DirectMessageResponse) {}
}
message Metadata {
string clientVersion = 1; // client version
int64 timestamp = 2; // unix time
}
enum Status {
UNKNOWN = 0;
OK = 200;
ERROR = 500;
}
message DirectMessageRequest {
Metadata metadata = 1;
string content = 2;
string type = 3;
}
message DirectMessageResponse{
Metadata metadata = 1;
Status status = 2;
optional string statusText = 3;
}

View File

@@ -0,0 +1,356 @@
/* eslint-disable import/export */
/* eslint-disable complexity */
/* eslint-disable @typescript-eslint/no-namespace */
/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */
/* eslint-disable @typescript-eslint/no-empty-interface */
import { type Codec, decodeMessage, type DecodeOptions, encodeMessage, enumeration, message } from 'protons-runtime'
import type { Uint8ArrayList } from 'uint8arraylist'
export interface dm {}
export namespace dm {
export interface DirectMessage {}
export namespace DirectMessage {
let _codec: Codec<DirectMessage>
export const codec = (): Codec<DirectMessage> => {
if (_codec == null) {
_codec = message<DirectMessage>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
default: {
reader.skipType(tag & 7)
break
}
}
}
return obj
})
}
return _codec
}
export const encode = (obj: Partial<DirectMessage>): Uint8Array => {
return encodeMessage(obj, DirectMessage.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<DirectMessage>): DirectMessage => {
return decodeMessage(buf, DirectMessage.codec(), opts)
}
}
export interface Metadata {
clientVersion: string
timestamp: bigint
}
export namespace Metadata {
let _codec: Codec<Metadata>
export const codec = (): Codec<Metadata> => {
if (_codec == null) {
_codec = message<Metadata>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if ((obj.clientVersion != null && obj.clientVersion !== '')) {
w.uint32(10)
w.string(obj.clientVersion)
}
if ((obj.timestamp != null && obj.timestamp !== 0n)) {
w.uint32(16)
w.int64(obj.timestamp)
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {
clientVersion: '',
timestamp: 0n
}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1: {
obj.clientVersion = reader.string()
break
}
case 2: {
obj.timestamp = reader.int64()
break
}
default: {
reader.skipType(tag & 7)
break
}
}
}
return obj
})
}
return _codec
}
export const encode = (obj: Partial<Metadata>): Uint8Array => {
return encodeMessage(obj, Metadata.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<Metadata>): Metadata => {
return decodeMessage(buf, Metadata.codec(), opts)
}
}
export enum Status {
UNKNOWN = 'UNKNOWN',
OK = 'OK',
ERROR = 'ERROR'
}
enum __StatusValues {
UNKNOWN = 0,
OK = 200,
ERROR = 500
}
export namespace Status {
export const codec = (): Codec<Status> => {
return enumeration<Status>(__StatusValues)
}
}
export interface DirectMessageRequest {
metadata?: dm.Metadata
content: string
type: string
}
export namespace DirectMessageRequest {
let _codec: Codec<DirectMessageRequest>
export const codec = (): Codec<DirectMessageRequest> => {
if (_codec == null) {
_codec = message<DirectMessageRequest>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if (obj.metadata != null) {
w.uint32(10)
dm.Metadata.codec().encode(obj.metadata, w)
}
if ((obj.content != null && obj.content !== '')) {
w.uint32(18)
w.string(obj.content)
}
if ((obj.type != null && obj.type !== '')) {
w.uint32(26)
w.string(obj.type)
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {
content: '',
type: ''
}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1: {
obj.metadata = dm.Metadata.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.metadata
})
break
}
case 2: {
obj.content = reader.string()
break
}
case 3: {
obj.type = reader.string()
break
}
default: {
reader.skipType(tag & 7)
break
}
}
}
return obj
})
}
return _codec
}
export const encode = (obj: Partial<DirectMessageRequest>): Uint8Array => {
return encodeMessage(obj, DirectMessageRequest.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<DirectMessageRequest>): DirectMessageRequest => {
return decodeMessage(buf, DirectMessageRequest.codec(), opts)
}
}
export interface DirectMessageResponse {
metadata?: dm.Metadata
status: dm.Status
statusText?: string
}
export namespace DirectMessageResponse {
let _codec: Codec<DirectMessageResponse>
export const codec = (): Codec<DirectMessageResponse> => {
if (_codec == null) {
_codec = message<DirectMessageResponse>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if (obj.metadata != null) {
w.uint32(10)
dm.Metadata.codec().encode(obj.metadata, w)
}
if (obj.status != null && __StatusValues[obj.status] !== 0) {
w.uint32(16)
dm.Status.codec().encode(obj.status, w)
}
if (obj.statusText != null) {
w.uint32(26)
w.string(obj.statusText)
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {
status: Status.UNKNOWN
}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1: {
obj.metadata = dm.Metadata.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.metadata
})
break
}
case 2: {
obj.status = dm.Status.codec().decode(reader)
break
}
case 3: {
obj.statusText = reader.string()
break
}
default: {
reader.skipType(tag & 7)
break
}
}
}
return obj
})
}
return _codec
}
export const encode = (obj: Partial<DirectMessageResponse>): Uint8Array => {
return encodeMessage(obj, DirectMessageResponse.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<DirectMessageResponse>): DirectMessageResponse => {
return decodeMessage(buf, DirectMessageResponse.codec(), opts)
}
}
let _codec: Codec<dm>
export const codec = (): Codec<dm> => {
if (_codec == null) {
_codec = message<dm>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
default: {
reader.skipType(tag & 7)
break
}
}
}
return obj
})
}
return _codec
}
export const encode = (obj: Partial<dm>): Uint8Array => {
return encodeMessage(obj, dm.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<dm>): dm => {
return decodeMessage(buf, dm.codec(), opts)
}
}

View File

@@ -1,13 +1,11 @@
import Head from 'next/head'
import { CheckCircleIcon, XCircleIcon } from '@heroicons/react/20/solid'
import Nav from '@/components/nav'
import { useLibp2pContext } from '@/context/ctx'
import type { PeerUpdate, Connection } from '@libp2p/interface'
import { PeerId } from '@libp2p/interface'
import { useCallback, useEffect, useState } from 'react'
import Image from 'next/image'
import { Multiaddr, multiaddr } from '@multiformats/multiaddr'
import { connectToMultiaddr, getFormattedConnections } from '../lib/libp2p'
import { connectToMultiaddr } from '../lib/libp2p'
import Spinner from '@/components/spinner'
import PeerList from '@/components/peer-list'