Add file exchange proto

This commit is contained in:
Gabriel Cruz
2025-08-11 17:04:21 -03:00
parent fe92d99c7e
commit 8de68caac6
5 changed files with 73 additions and 35 deletions

View File

@@ -10,4 +10,4 @@ bin = @["nim_peer"]
# Dependencies
requires "nim >= 2.2.0", "nimwave", "chronos", "libp2p", "illwill", "cligen"
requires "nim >= 2.2.0", "nimwave", "chronos", "libp2p", "illwill", "cligen", "stew"

View File

@@ -0,0 +1,34 @@
import os, strutils, sequtils
import libp2p, chronos, stew/byteutils
import ./utils
const
MaxFileSize: int = 1024 # 1KiB
MaxFileIdSize: int = 1024 # 1KiB
FileExchangeCodec*: string = "/universal-connectivity-file/1"
type FileExchange* = ref object of LPProtocol
proc new*(T: typedesc[FileExchange]): T =
proc handle(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
try:
let fileId = string.fromBytes(await conn.readLp(MaxFileIdSize))
# TODO: this is unsafe, fix it
let filename = "/tmp/" & fileId
# if /tmp/{fileid} exists
if fileExists(filename):
let fileContent = cast[seq[byte]](readFile(filename))
await conn.writeLp(fileContent)
except CancelledError as e:
raise e
except CatchableError as e:
echo "exception in handler", e.msg
finally:
await conn.close()
return T.new(codecs = @[FileExchangeCodec], handler = handle)
proc requestFile*(p: FileExchange, conn: Connection, fileId: string): Future[seq[byte]] {.async.} =
await conn.writeLp(cast[seq[byte]](fileId))
await conn.readLp(MaxFileSize)

View File

@@ -1,16 +1,16 @@
from illwave as iw import nil, `[]`, `[]=`, `==`, width, height
from terminal import nil
import std/sha1
import chronos, tables, deques, strutils, sequtils
import libp2p, cligen
from libp2p/protocols/pubsub/rpc/message import Message
import ./ui/root
import ./utils
import ./file_exchange
const MAX_FILE_SIZE: int = 1024 # 1KiB
proc start(peerId: PeerId, addrs: seq[MultiAddress]) {.async.} =
proc start(peerId: PeerId, addrs: seq[MultiAddress], room: string) {.async.} =
# setup peer
let switch = SwitchBuilder
.new()
@@ -22,6 +22,10 @@ proc start(peerId: PeerId, addrs: seq[MultiAddress]) {.async.} =
let gossip =
GossipSub.init(switch = switch, triggerSelf = true, verifySignature = false)
switch.mount(gossip)
let fileExchange = FileExchange.new()
switch.mount(fileExchange)
await switch.start()
let recvQ = newAsyncQueue[string]()
@@ -48,18 +52,17 @@ proc start(peerId: PeerId, addrs: seq[MultiAddress]) {.async.} =
let onNewFile = proc(
topic: string, msg: Message
): Future[ValidationResult] {.async, gcsafe.} =
let fileId = msg.data
let fileId = cast[string](msg.data)
# use known addresses since we can't use kad to get peer addrs
# this means that we're unable to get files from a peer to which we don't have the addresses
let conn = await switch.dial(msg.fromPeer, addrs, "/universal-connectivity-file/1")
defer: await conn.close()
# Request file
await conn.writeLp(fileId)
# Read file contents
let fileContents = await conn.readLp(MAX_FILE_SIZE)
# TODO: do sth with file
let strFile = cast[string](fileContents)
echo "downloaded file: " & strFile
let conn = await switch.dial(msg.fromPeer, addrs, FileExchangeCodec)
# TODO: path appending is unsafe (sanitize)
let filePath = "/tmp/" & fileId
let fileContents = await fileExchange.requestFile(conn, fileId)
writeFile(filePath, fileContents)
await conn.close()
# Save file in /tmp/fileId
echo "Downloaded file to " & filePath
return ValidationResult.Accept
# when a new peer is announced
@@ -70,18 +73,18 @@ proc start(peerId: PeerId, addrs: seq[MultiAddress]) {.async.} =
# register validators and handlers
# chat messages
gossip.subscribe(GOSSIPSUB_CHAT_TOPIC, nil)
gossip.addValidator(GOSSIPSUB_CHAT_TOPIC, onChatMsg)
gossip.subscribe(room, nil)
gossip.addValidator(room, onChatMsg)
# files
gossip.subscribe(GOSSIPSUB_CHAT_FILE_TOPIC, nil)
gossip.addValidator(GOSSIPSUB_CHAT_FILE_TOPIC, onNewFile)
gossip.subscribe(ChatFileTopic, nil)
gossip.addValidator(ChatFileTopic, onNewFile)
# peer discovery
gossip.subscribe(GOSSIPSUB_PEER_DISCOVERY_TOPIC, onNewPeer)
gossip.subscribe(PeerDiscoveryTopic, onNewPeer)
try:
await runUI(gossip, recvQ, peerQ, switch.peerInfo.peerId)
await runUI(gossip, room, recvQ, peerQ, switch.peerInfo.peerId)
iw.deinit()
except Exception as exc:
echo "runUI error: " & exc.msg
@@ -91,13 +94,13 @@ proc start(peerId: PeerId, addrs: seq[MultiAddress]) {.async.} =
await switch.stop()
terminal.showCursor()
proc cli(args: seq[string]) =
proc cli(room=ChatTopic, args: seq[string]) =
if args.len < 2:
echo "usage: nimble run -- <peer_id> <multiaddress>, [<multiaddresss>, ...]"
return
let peerId = PeerId.init(args[0]).get()
let addrs = args[1 ..^ 1].mapIt(MultiAddress.init(it).get())
waitFor start(peerId, addrs)
waitFor start(peerId, addrs, room)
when isMainModule:
dispatch cli

View File

@@ -16,8 +16,8 @@ type State = object
include nimwave/prelude
const
PEERS_PANEL_WIDTH: int = 10
TOP_HEIGHT: int = 15
PeersPanelWidth: int = 10
TopHeight: int = 15
type
PeersPanel = ref object of nw.Node
@@ -26,11 +26,11 @@ type
method render(node: ChatPanel, ctx: var nw.Context[State]) =
let width =
if PEERS_PANEL_WIDTH < iw.width(ctx.tb):
iw.width(ctx.tb) - PEERS_PANEL_WIDTH
if PeersPanelWidth < iw.width(ctx.tb):
iw.width(ctx.tb) - PeersPanelWidth
else:
iw.width(ctx.tb)
ctx = nw.slice(ctx, 0, 0, width, TOP_HEIGHT)
ctx = nw.slice(ctx, 0, 0, width, TopHeight)
render(
nw.Box(
border: nw.Border.Single,
@@ -41,8 +41,8 @@ method render(node: ChatPanel, ctx: var nw.Context[State]) =
)
method render(node: PeersPanel, ctx: var nw.Context[State]) =
let width = PEERS_PANEL_WIDTH
let height = TOP_HEIGHT
let width = PeersPanelWidth
let height = TopHeight
ctx = nw.slice(ctx, 0, 0, width, height)
render(
nw.Box(
@@ -56,8 +56,8 @@ method render(node: PeersPanel, ctx: var nw.Context[State]) =
method render(node: SystemPanel, ctx: var nw.Context[State]) =
let width = iw.width(ctx.tb)
let height =
if TOP_HEIGHT < iw.height(ctx.tb):
iw.height(ctx.tb) - TOP_HEIGHT
if TopHeight < iw.height(ctx.tb):
iw.height(ctx.tb) - TopHeight
else:
iw.height(ctx.tb)
ctx = nw.slice(ctx, 0, 0, width, height)
@@ -72,6 +72,7 @@ method render(node: SystemPanel, ctx: var nw.Context[State]) =
proc runUI*(
gossip: GossipSub,
room: string,
recvQ: AsyncQueue[string],
peerQ: AsyncQueue[PeerId],
myPeerId: PeerId,
@@ -128,7 +129,7 @@ proc runUI*(
# TODO: handle /file command to send/publish files
# /file filename (registers ID in local database, sends fileId, handles incoming file requests)
discard await gossip.publish(
GOSSIPSUB_CHAT_TOPIC, cast[seq[byte]](@(ctx.data.inputBuffer))
room, cast[seq[byte]](@(ctx.data.inputBuffer))
)
ctx.data.messages.add("You: " & ctx.data.inputBuffer) # show message in ui
ctx.data.inputBuffer = "" # clear input buffer

View File

@@ -1,7 +1,7 @@
const
GOSSIPSUB_CHAT_TOPIC*: string = "universal-connectivity"
GOSSIPSUB_CHAT_FILE_TOPIC*: string = "universal-connectivity-file"
GOSSIPSUB_PEER_DISCOVERY_TOPIC*: string =
ChatTopic*: string = "universal-connectivity"
ChatFileTopic*: string = "universal-connectivity-file"
PeerDiscoveryTopic*: string =
"universal-connectivity-browser-peer-discovery"
import libp2p