Compare commits

...

18 Commits

Author SHA1 Message Date
Ludovic Chenut
9f90721d12 feat: add proc genUfrag to generate a random username string 2024-06-10 15:57:40 +02:00
Ludovic Chenut
8b9f34959b remove trailing space 2024-04-03 16:36:06 +02:00
Ludovic Chenut
c327762f47 Add comments & remove TODO already done 2024-03-22 14:41:31 +01:00
Ludovic Chenut
abd3653d56 update commit 2024-03-06 16:53:48 +01:00
Ludovic Chenut
afe2b08129 Fix a lot of small bugs 2024-02-15 16:10:20 +01:00
Ludovic Chenut
03ff023e94 fix webrtcstream 2024-02-05 17:44:35 +01:00
Ludovic Chenut
60d48e644b update pinned 2023-12-15 09:54:17 +01:00
Ludovic Chenut
58294ce156 update pinned 2023-12-07 10:00:16 +01:00
Ludovic Chenut
359a448c1b update pinned & fix localCertificate 2023-11-29 14:53:01 +01:00
Ludovic Chenut
7945cc754e add prologue/remote & local cert to the handshake 2023-11-23 15:00:56 +01:00
Ludovic Chenut
284188a74f update pinned 2023-11-16 16:05:41 +01:00
Ludovic Chenut
dab487eeb3 update pinned 2023-11-09 15:56:04 +01:00
Ludovic Chenut
ad43f41ad7 fix hashBtS 2023-11-07 10:27:36 +01:00
Ludovic Chenut
f350479824 update pinned + fixes 2023-10-24 17:20:27 +02:00
Ludovic Chenut
c6460ea7ce fixes pinned + webrtctransport 2023-10-19 12:12:56 +02:00
Tanguy
30e93e7c0a almost compiling 2023-10-13 18:08:09 +02:00
Tanguy
e0f2b00f9a WebRTC scaffolding 2023-10-11 16:18:53 +02:00
Tanguy
6ab779d30a MultiAddress support 2023-10-11 11:35:00 +02:00
10 changed files with 640 additions and 12 deletions

View File

@@ -1,17 +1,20 @@
bearssl;https://github.com/status-im/nim-bearssl@#e4157639db180e52727712a47deaefcbbac6ec86
binary_serialization;https://github.com/status-im/nim-binary-serialization.git@#38a73a70fd43f3835ca01a877353858b19e39d70
chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a
chronos;https://github.com/status-im/nim-chronos@#ba143e029f35fd9b4cd3d89d007cc834d0d5ba3c
dnsclient;https://github.com/ba0f3/dnsclient.nim@#23214235d4784d24aceed99bbfe153379ea557c8
faststreams;https://github.com/status-im/nim-faststreams@#720fc5e5c8e428d9d0af618e1e27c44b42350309
httputils;https://github.com/status-im/nim-http-utils@#3b491a40c60aad9e8d3407443f46f62511e63b18
httputils;https://github.com/status-im/nim-http-utils@#87b7cbf032c90b9e6b446081f4a647e950362cec
json_serialization;https://github.com/status-im/nim-json-serialization@#85b7ea093cb85ee4f433a617b97571bd709d30df
mbedtls;https://github.com/status-im/nim-mbedtls.git@#308f3edaa0edcc880b54ce22156fb2f4e2a2bcc7
metrics;https://github.com/status-im/nim-metrics@#6142e433fc8ea9b73379770a788017ac528d46ff
nimcrypto;https://github.com/cheatfate/nimcrypto@#1c8d6e3caf3abc572136ae9a1da81730c4eb4288
nimcrypto;https://github.com/cheatfate/nimcrypto@#a079df92424968d46a6ac258299ce9380aa153f2
results;https://github.com/arnetheduck/nim-results@#f3c666a272c69d70cb41e7245e7f6844797303ad
secp256k1;https://github.com/status-im/nim-secp256k1@#7246d91c667f4cc3759fdd50339caa45a2ecd8be
serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe54049950e352bb969aab97173b35
stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e
testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34
unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c
webrtc;https://github.com/status-im/nim-webrtc.git@#d525da3d62ed65e989d782e4cbb7edf221128568
websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982
zlib;https://github.com/status-im/nim-zlib@#38b72eda9d70067df4a953f56b5ed59630f2a17b
zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5

View File

@@ -17,6 +17,7 @@ requires "nim >= 1.6.0",
"secp256k1",
"stew#head",
"websock",
"https://github.com/status-im/nim-webrtc.git",
"unittest2 >= 0.0.5 & <= 0.1.0"
let nimc = getEnv("NIMC", "nim") # Which nim compiler to use

View File

@@ -297,6 +297,33 @@ proc dnsVB(vb: var VBuffer): bool =
if s.find('/') == -1:
result = true
proc certHashStB(s: string, vb: var VBuffer): bool =
## CertHash address stringToBuffer() implementation.
var data = MultiBase.decode(s).valueOr:
return false
var mh: MultiHash
if MultiHash.decode(data, mh).isOk:
vb.writeSeq(data)
result = true
proc certHashBtS(vb: var VBuffer, s: var string): bool =
## CertHash address bufferToString() implementation.
var address = newSeq[byte]()
if vb.readSeq(address) > 0:
var mh: MultiHash
if MultiHash.decode(address, mh).isOk:
s = MultiBase.encode("base64url", address).valueOr:
return false
result = true
proc certHashVB(vb: var VBuffer): bool =
## CertHash address validateBuffer() implementation.
var address = newSeq[byte]()
if vb.readSeq(address) > 0:
var mh: MultiHash
if MultiHash.decode(address, mh).isOk:
result = true
proc mapEq*(codec: string): MaPattern =
## ``Equal`` operator for pattern
result.operator = Eq
@@ -358,6 +385,11 @@ const
bufferToString: dnsBtS,
validateBuffer: dnsVB
)
TranscoderCertHash* = Transcoder(
stringToBuffer: certHashStB,
bufferToString: certHashBtS,
validateBuffer: certHashVB
)
ProtocolsList = [
MAProtocol(
mcodec: multiCodec("ip4"), kind: Fixed, size: 4,
@@ -458,7 +490,17 @@ const
),
MAProtocol(
mcodec: multiCodec("p2p-webrtc-direct"), kind: Marker, size: 0
)
),
MAProtocol(
mcodec: multiCodec("webrtc"), kind: Marker, size: 0
),
MAProtocol(
mcodec: multiCodec("webrtc-direct"), kind: Marker, size: 0
),
MAProtocol(
mcodec: multiCodec("certhash"), kind: Length, size: 0,
coder: TranscoderCertHash
),
]
DNSANY* = mapEq("dns")
@@ -489,6 +531,7 @@ const
WebSockets_DNS* = mapOr(WS_DNS, WSS_DNS)
WebSockets_IP* = mapOr(WS_IP, WSS_IP)
WebSockets* = mapOr(WS, WSS)
WebRtcDirect2* = mapAnd(UDP, mapEq("webrtc-direct"), mapEq("certhash"))
Onion3* = mapEq("onion3")
TcpOnion3* = mapAnd(TCP, Onion3)
@@ -512,7 +555,7 @@ const
mapAnd(DNS, mapEq("https"))
)
WebRTCDirect* = mapOr(
WebRTCDirect* {.deprecated.} = mapOr(
mapAnd(HTTP, mapEq("p2p-webrtc-direct")),
mapAnd(HTTPS, mapEq("p2p-webrtc-direct"))
)

View File

@@ -193,11 +193,14 @@ const MultiCodecList = [
("https", 0x01BB),
("tls", 0x01C0),
("quic", 0x01CC),
("certhash", 0x01D2),
("ws", 0x01DD),
("wss", 0x01DE),
("p2p-websocket-star", 0x01DF), # not in multicodec list
("p2p-webrtc-star", 0x0113), # not in multicodec list
("p2p-webrtc-direct", 0x0114), # not in multicodec list
("webrtc-direct", 0x0118),
("webrtc", 0x0119),
("onion", 0x01BC),
("onion3", 0x01BD),
("p2p-circuit", 0x0122),

View File

@@ -82,7 +82,7 @@ type
localPrivateKey: PrivateKey
localPublicKey: seq[byte]
noiseKeys: KeyPair
commonPrologue: seq[byte]
commonPrologue*: seq[byte]
outgoing: bool
NoiseConnection* = ref object of SecureConn

View File

@@ -172,7 +172,7 @@ method start*(
self.servers &= server
trace "Listening on", address = ma
trace "Listening on", address = self.addrs[i]
method stop*(self: TcpTransport) {.async, gcsafe.} =
## stop the transport

View File

@@ -0,0 +1,532 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
## WebRtc transport implementation
## For now, only support WebRtc direct (ie browser to server)
{.push raises: [].}
import std/[sequtils]
import stew/[endians2, byteutils, objects, results]
import chronos, chronicles
import transport,
../errors,
../wire,
../multicodec,
../multihash,
../multibase,
../protobuf/minprotobuf,
../connmanager,
../muxers/muxer,
../multiaddress,
../stream/connection,
../upgrademngrs/upgrade,
../protocols/secure/noise,
../utility
import webrtc/webrtc, webrtc/datachannel, webrtc/dtls/dtls
logScope:
topics = "libp2p webrtctransport"
export transport, results
const charset = toSeq("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789/+".items)
proc genUfrag*(rng: ref HmacDrbgContext, size: int): seq[byte] =
# https://github.com/libp2p/specs/blob/master/webrtc/webrtc-direct.md?plain=1#L73-L77
result = newSeq[byte](size)
for resultIndex in 0..<size:
let charsetIndex = rng[].generate(uint) mod charset.len()
result[resultIndex] = charset[charsetIndex].ord().uint8
const
WebRtcTransportTrackerName* = "libp2p.webrtctransport"
MaxMessageSize = 16384 # 16KiB; from the WebRtc-direct spec
# -- Message --
# Implementation of the libp2p's WebRTC message defined here:
# https://github.com/libp2p/specs/blob/master/webrtc/README.md?plain=1#L60-L79
type
MessageFlag = enum
## Flags to support half-closing and reset of streams.
## - Fin: Sender will no longer send messages
## - StopSending: Sender will no longer read messages.
## Received messages are discarded
## - ResetStream: Sender abruptly terminates the sending part of the stream.
## Receiver MAY discard any data that it already received on that stream
## - FinAck: Acknowledges the previous receipt of a message with the Fin flag set.
Fin = 0
StopSending = 1
ResetStream = 2
FinAck = 3
WebRtcMessage = object
flag: Opt[MessageFlag]
data: seq[byte]
proc decode(_: type WebRtcMessage, bytes: seq[byte]): Opt[WebRtcMessage] =
## Decoding WebRTC Message from raw data
var
pb = initProtoBuffer(bytes)
flagOrd: uint32
res: WebRtcMessage
if ? pb.getField(1, flagOrd).toOpt():
var flag: MessageFlag
if flag.checkedEnumAssign(flagOrd):
res.flag = Opt.some(flag)
discard ? pb.getField(2, res.data).toOpt()
Opt.some(res)
proc encode(msg: WebRtcMessage): seq[byte] =
## Encoding WebRTC Message to raw data
var pb = initProtoBuffer()
msg.flag.withValue(val):
pb.write(1, uint32(val))
if msg.data.len > 0:
pb.write(2, msg.data)
pb.finish()
pb.buffer
# -- Raw WebRTC Stream --
# All the data written to or read from a WebRtcStream should be length-prefixed
# so `readOnce`/`write` WebRtcStream implementation must either recode
# `readLP`/`writeLP`, or implement a `RawWebRtcStream` on which we can
# directly use `readLP` and `writeLP`. The second solution is the less redundant,
# so it's the one we've chosen.
type
RawWebRtcStream = ref object of Connection
dataChannel: DataChannelStream
readData: seq[byte]
proc new(_: type RawWebRtcStream, dataChannel: DataChannelStream): RawWebRtcStream =
RawWebRtcStream(dataChannel: dataChannel)
method closeImpl*(s: RawWebRtcStream): Future[void] =
# TODO: close datachannel
discard
method write*(s: RawWebRtcStream, msg: seq[byte]): Future[void] =
trace "RawWebrtcStream write", msg, len=msg.len()
s.dataChannel.write(msg)
method readOnce*(s: RawWebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
# TODO:
# if s.isClosed:
# raise newLPStreamEOFError()
if s.readData.len() == 0:
let rawData = await s.dataChannel.read()
s.readData = rawData
trace "readOnce RawWebRtcStream", data = s.readData, nbytes
result = min(nbytes, s.readData.len)
copyMem(pbytes, addr s.readData[0], result)
s.readData = s.readData[result..^1]
# -- Stream --
type
WebRtcState = enum
Sending, Closing, Closed
WebRtcStream = ref object of Connection
rawStream: RawWebRtcStream
sendQueue: seq[(seq[byte], Future[void])]
sendLoop: Future[void]
readData: seq[byte]
txState: WebRtcState # Transmission
rxState: WebRtcState # Reception
proc new(
_: type WebRtcStream,
dataChannel: DataChannelStream,
oaddr: Opt[MultiAddress],
peerId: PeerId): WebRtcStream =
let stream = WebRtcStream(rawStream: RawWebRtcStream.new(dataChannel),
observedAddr: oaddr, peerId: peerId)
procCall Connection(stream).initStream()
stream
proc sender(s: WebRtcStream) {.async.} =
while s.sendQueue.len > 0:
let (message, fut) = s.sendQueue.pop()
#TODO handle exceptions
await s.rawStream.writeLp(message)
if not fut.isNil: fut.complete()
proc send(s: WebRtcStream, msg: WebRtcMessage, fut: Future[void] = nil) =
let wrappedMessage = msg.encode()
s.sendQueue.insert((wrappedMessage, fut))
if s.sendLoop == nil or s.sendLoop.finished:
s.sendLoop = s.sender()
method write*(s: WebRtcStream, msg2: seq[byte]): Future[void] =
# We need to make sure we send all of our data before another write
# Otherwise, two concurrent writes could get intertwined
# We avoid this by filling the s.sendQueue synchronously
var msg = msg2
trace "WebrtcStream write", msg, len=msg.len()
let retFuture = newFuture[void]("WebRtcStream.write")
if s.txState != Sending:
retFuture.fail(newLPStreamClosedError())
return retFuture
var messages: seq[seq[byte]]
while msg.len > MaxMessageSize - 16:
let
endOfMessage = MaxMessageSize - 16
wrappedMessage = WebRtcMessage(data: msg[0 ..< endOfMessage])
s.send(wrappedMessage)
msg = msg[endOfMessage .. ^1]
let
wrappedMessage = WebRtcMessage(data: msg)
s.send(wrappedMessage, retFuture)
return retFuture
proc actuallyClose(s: WebRtcStream) {.async.} =
debug "stream closed", rxState=s.rxState, txState=s.txState
if s.rxState == Closed and s.txState == Closed and s.readData.len == 0:
#TODO add support to DataChannel
#await s.dataChannel.close()
await procCall Connection(s).closeImpl()
method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
if s.rxState == Closed:
raise newLPStreamEOFError()
while s.readData.len == 0 or nbytes == 0:
# Check if there's no data left in readData or if nbytes is equal to 0
# in order to read an eventual Fin or FinAck
if s.rxState == Closed:
await s.actuallyClose()
return 0
let
#TODO handle exceptions
message = await s.rawStream.readLp(MaxMessageSize)
decoded = WebRtcMessage.decode(message).tryGet()
s.readData = s.readData.concat(decoded.data)
decoded.flag.withValue(flag):
case flag:
of Fin:
# Peer won't send any more data
s.rxState = Closed
s.send(WebRtcMessage(flag: Opt.some(FinAck)))
of FinAck:
s.txState = Closed
await s.actuallyClose()
if nbytes == 0:
return 0
else: discard
result = min(nbytes, s.readData.len)
copyMem(pbytes, addr s.readData[0], result)
s.readData = s.readData[result..^1]
method closeImpl*(s: WebRtcStream) {.async.} =
s.send(WebRtcMessage(flag: Opt.some(Fin)))
s.txState = Closing
while s.txState != Closed:
discard await s.readOnce(nil, 0)
# -- Connection --
type WebRtcConnection = ref object of Connection
connection: DataChannelConnection
remoteAddress: MultiAddress
method close*(conn: WebRtcConnection) {.async.} =
#TODO
discard
proc new(
_: type WebRtcConnection,
conn: DataChannelConnection,
observedAddr: Opt[MultiAddress]
): WebRtcConnection =
let co = WebRtcConnection(connection: conn, observedAddr: observedAddr)
procCall Connection(co).initStream()
co
proc getStream*(conn: WebRtcConnection,
direction: Direction,
noiseHandshake: bool = false): Future[WebRtcStream] {.async.} =
var datachannel =
case direction:
of Direction.In:
await conn.connection.accept()
of Direction.Out:
await conn.connection.openStream(noiseHandshake)
return WebRtcStream.new(datachannel, conn.observedAddr, conn.peerId)
# -- Muxer --
type WebRtcMuxer = ref object of Muxer
webRtcConn: WebRtcConnection
handleFut: Future[void]
method newStream*(m: WebRtcMuxer, name: string = "", lazy: bool = false): Future[Connection] {.async, gcsafe.} =
return await m.webRtcConn.getStream(Direction.Out)
proc handleStream(m: WebRtcMuxer, chann: WebRtcStream) {.async.} =
try:
await m.streamHandler(chann)
trace "finished handling stream"
doAssert(chann.closed, "connection not closed by handler!")
except CatchableError as exc:
trace "Exception in mplex stream handler", msg = exc.msg
await chann.close()
#TODO add atEof
method handle*(m: WebRtcMuxer): Future[void] {.async, gcsafe.} =
try:
#while not m.webRtcConn.atEof:
while true:
let incomingStream = await m.webRtcConn.getStream(Direction.In)
asyncSpawn m.handleStream(incomingStream)
finally:
await m.webRtcConn.close()
method close*(m: WebRtcMuxer) {.async, gcsafe.} =
m.handleFut.cancel()
await m.webRtcConn.close()
# -- Upgrader --
type
WebRtcStreamHandler = proc(conn: Connection): Future[void] {.gcsafe, raises: [].}
WebRtcUpgrade = ref object of Upgrade
streamHandler: WebRtcStreamHandler
method upgrade*(
self: WebRtcUpgrade,
conn: Connection,
direction: Direction,
peerId: Opt[PeerId]): Future[Muxer] {.async.} =
let webRtcConn = WebRtcConnection(conn)
result = WebRtcMuxer(connection: conn, webRtcConn: webRtcConn)
# Noise handshake
let noiseHandler = self.secureManagers.filterIt(it of Noise)
assert noiseHandler.len > 0
let xx = "libp2p-webrtc-noise:".toBytes()
let localCert = MultiHash.digest("sha2-256", webRtcConn.connection.conn.conn.localCertificate()).get().data.buffer
let remoteCert = MultiHash.digest("sha2-256", webRtcConn.connection.conn.conn.remoteCertificate()).get().data.buffer
((Noise)noiseHandler[0]).commonPrologue = xx & remoteCert & localCert
echo "=> ", ((Noise)noiseHandler[0]).commonPrologue
let
stream = await webRtcConn.getStream(Out, true)
secureStream = await noiseHandler[0].handshake(
stream,
initiator = true, # we are always the initiator in webrtc-direct
peerId = peerId
)
# Peer proved its identity, we can close this
await secureStream.close()
await stream.close()
result.streamHandler = self.streamHandler
result.handler = result.handle()
# -- Transport --
type
WebRtcTransport* = ref object of Transport
connectionsTimeout: Duration
servers: seq[WebRtc]
acceptFuts: seq[Future[DataChannelConnection]]
clients: array[Direction, seq[DataChannelConnection]]
WebRtcTransportTracker* = ref object of TrackerBase
opened*: uint64
closed*: uint64
WebRtcTransportError* = object of transport.TransportError
proc setupWebRtcTransportTracker(): WebRtcTransportTracker {.gcsafe, raises: [].}
proc getWebRtcTransportTracker(): WebRtcTransportTracker {.gcsafe.} =
result = cast[WebRtcTransportTracker](getTracker(WebRtcTransportTrackerName))
if isNil(result):
result = setupWebRtcTransportTracker()
proc dumpTracking(): string {.gcsafe.} =
var tracker = getWebRtcTransportTracker()
result = "Opened tcp transports: " & $tracker.opened & "\n" &
"Closed tcp transports: " & $tracker.closed
proc leakTransport(): bool {.gcsafe.} =
var tracker = getWebRtcTransportTracker()
result = (tracker.opened != tracker.closed)
proc setupWebRtcTransportTracker(): WebRtcTransportTracker =
result = new WebRtcTransportTracker
result.opened = 0
result.closed = 0
result.dump = dumpTracking
result.isLeaked = leakTransport
addTracker(WebRtcTransportTrackerName, result)
proc new*(
T: typedesc[WebRtcTransport],
upgrade: Upgrade,
connectionsTimeout = 10.minutes): T {.public.} =
let upgrader = WebRtcUpgrade(ms: upgrade.ms, secureManagers: upgrade.secureManagers)
upgrader.streamHandler = proc(conn: Connection)
{.async, gcsafe, raises: [].} =
# TODO: replace echo by trace and find why it fails compiling
echo "Starting stream handler"#, conn
try:
await upgrader.ms.handle(conn) # handle incoming connection
except CancelledError as exc:
raise exc
except CatchableError as exc:
echo "exception in stream handler", exc.msg#, conn, msg = exc.msg
finally:
await conn.closeWithEOF()
echo "Stream handler done"#, conn
let
transport = T(
upgrader: upgrader,
connectionsTimeout: connectionsTimeout)
return transport
method start*(
self: WebRtcTransport,
addrs: seq[MultiAddress]) {.async.} =
## listen on the transport
##
if self.running:
warn "WebRtc transport already running"
return
await procCall Transport(self).start(addrs)
trace "Starting WebRtc transport"
inc getWebRtcTransportTracker().opened
for i, ma in addrs:
if not self.handles(ma):
trace "Invalid address detected, skipping!", address = ma
continue
let
transportAddress = initTAddress(ma[0..1].tryGet()).tryGet()
server = WebRtc.new(transportAddress)
server.listen()
self.servers &= server
let
cert = server.dtls.localCertificate()
certHash = MultiHash.digest("sha2-256", cert).get().data.buffer
encodedCertHash = MultiBase.encode("base64", certHash).get()
self.addrs[i] = MultiAddress.init(server.udp.laddr, IPPROTO_UDP).tryGet() &
MultiAddress.init(multiCodec("webrtc-direct")).tryGet() &
MultiAddress.init(multiCodec("certhash"), certHash).tryGet()
trace "Listening on", address = self.addrs[i]
proc connHandler(
self: WebRtcTransport,
client: DataChannelConnection,
observedAddr: Opt[MultiAddress],
dir: Direction
): Future[Connection] {.async.} =
trace "Handling webrtc connection", address = $observedAddr, dir = $dir,
clients = self.clients[Direction.In].len + self.clients[Direction.Out].len
let conn: Connection =
WebRtcConnection.new(
conn = client,
# dir = dir,
observedAddr = observedAddr
# timeout = self.connectionsTimeout
)
proc onClose() {.async.} =
try:
let futs = @[conn.join(), conn.join()] #TODO that's stupid
await futs[0] or futs[1]
for f in futs:
if not f.finished: await f.cancelAndWait() # cancel outstanding join()
trace "Cleaning up client"# TODO ?: , addrs = $client.remoteAddress,
# conn
self.clients[dir].keepItIf( it != client )
#TODO
#await allFuturesThrowing(
# conn.close(), client.closeWait())
except CatchableError as exc:
let useExc {.used.} = exc
debug "Error cleaning up client", errMsg = exc.msg, conn
self.clients[dir].add(client)
asyncSpawn onClose()
return conn
method accept*(self: WebRtcTransport): Future[Connection] {.async, gcsafe.} =
if not self.running:
raise newTransportClosedError()
#TODO handle errors
if self.acceptFuts.len <= 0:
self.acceptFuts = self.servers.mapIt(it.accept())
if self.acceptFuts.len <= 0:
return
let
finished = await one(self.acceptFuts)
index = self.acceptFuts.find(finished)
self.acceptFuts[index] = self.servers[index].accept()
trace "Accept WebRTC Transport"
let transp = await finished
try:
#TODO add remoteAddress to DataChannelConnection
#let observedAddr = MultiAddress.init(transp.remoteAddress).tryGet() #TODO add /webrtc-direct
let observedAddr = MultiAddress.init("/ip4/127.0.0.1").tryGet()
return await self.connHandler(transp, Opt.some(observedAddr), Direction.In)
except CancelledError as exc:
#TODO
#transp.close()
raise exc
except CatchableError as exc:
debug "Failed to handle connection", exc = exc.msg
#TODO
#transp.close()
method handles*(t: WebRtcTransport, address: MultiAddress): bool {.gcsafe.} =
if procCall Transport(t).handles(address):
if address.protocols.isOk:
return WebRtcDirect2.match(address)

View File

@@ -171,8 +171,6 @@ method start*(
trace "Listening on", addresses = self.addrs
self.running = true
method stop*(self: WsTransport) {.async, gcsafe.} =
## stop the transport
##

View File

@@ -147,7 +147,9 @@ const
"/ip4/127.0.0.1/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC/tcp/1234",
"/ip6/2001:8a0:7ac5:4201:3ac9:86ff:fe31:7095/tcp/8000/ws/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC",
"/p2p-webrtc-star/ip4/127.0.0.1/tcp/9090/ws/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC",
"/ip4/127.0.0.1/tcp/9090/p2p-circuit/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC"
"/ip4/127.0.0.1/tcp/9090/p2p-circuit/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC",
"/ip4/127.0.0.1/udp/1234/webrtc-direct",
"/ip4/127.0.0.1/udp/1234/webrtc-direct/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g",
]
RustSuccessExpects = [
@@ -177,7 +179,9 @@ const
"047F000001A503221220D52EBB89D85B02A284948203A62FF28389C57C9F42BEEC4EC20DB76A68911C0B0604D2",
"29200108A07AC542013AC986FFFE317095061F40DD03A503221220D52EBB89D85B02A284948203A62FF28389C57C9F42BEEC4EC20DB76A68911C0B",
"9302047F000001062382DD03A503221220D52EBB89D85B02A284948203A62FF28389C57C9F42BEEC4EC20DB76A68911C0B",
"047F000001062382A202A503221220D52EBB89D85B02A284948203A62FF28389C57C9F42BEEC4EC20DB76A68911C0B"
"047F000001062382A202A503221220D52EBB89D85B02A284948203A62FF28389C57C9F42BEEC4EC20DB76A68911C0B",
"047F000001910204D29802",
"047F000001910204D29802D203221220C3AB8FF13720E8AD9047DD39466B3C8974E592C2FA383D4A3960714CAEF0C4F2"
]
RustFailureVectors = [
@@ -211,7 +215,9 @@ const
"/ip4/127.0.0.1/tcp",
"/ip4/127.0.0.1/ipfs",
"/ip4/127.0.0.1/ipfs/tcp",
"/p2p-circuit/50"
"/p2p-circuit/50",
"/ip4/127.0.0.1/udp/1234/webrtc-direct/certhash/b2uaraocy6yrdblb4sfptaddgimjmmp",
"/ip4/127.0.0.1/udp/1234/webrtc-direct/certhash"
]
PathVectors = [

42
testwebrtc.nim Normal file
View File

@@ -0,0 +1,42 @@
import chronos, libp2p, libp2p/transports/webrtctransport
import stew/byteutils
proc echoHandler(conn: Connection, proto: string) {.async.} =
defer: await conn.close()
while true:
try:
echo "\e[35;1m => Echo Handler <=\e[0m"
var xx = newSeq[byte](1024)
let aa = await conn.readOnce(addr xx[0], 1024)
xx = xx[0..<aa]
let msg = string.fromBytes(xx)
echo " => Echo Handler Receive: ", msg, " <="
echo " => Echo Handler Try Send: ", msg & "1", " <="
await conn.write(msg & "1")
except CatchableError as e:
echo " => Echo Handler Error: ", e.msg, " <="
break
proc main {.async.} =
let switch =
SwitchBuilder.new()
.withAddress(MultiAddress.init("/ip4/127.0.0.1/udp/4242/webrtc-direct/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g").tryGet()) #TODO the certhash shouldn't be necessary
.withRng(crypto.newRng())
.withMplex()
.withYamux()
.withTransport(proc (upgr: Upgrade): Transport = WebRtcTransport.new(upgr))
.withNoise()
.build()
let
codec = "/echo/1.0.0"
proto = new LPProtocol
proto.handler = echoHandler
proto.codec = codec
switch.mount(proto)
await switch.start()
echo "\e[31;1m", $(switch.peerInfo.addrs[0]), "/p2p/", $(switch.peerInfo.peerId), "\e[0m"
await sleepAsync(1.hours)
waitFor main()