Compare commits

...

26 Commits

Author SHA1 Message Date
Ludovic Chenut
6df4a53ec3 fix: password provider 2024-11-15 14:07:38 +01:00
Ludovic Chenut
255bf740ea feat: webrtc-direct end to end 2024-11-08 15:06:56 +01:00
Ludovic Chenut
5af460d7fd Merge remote-tracking branch 'origin/master' into webrtc-direct-e2e 2024-10-25 14:01:15 +02:00
Ludovic Chenut
b0ce2a428f feat: change .pinned 2024-10-24 13:20:00 +02:00
Álex
a3b8729cbe fix(ci): Daily workflows report (#1200)
Fix daily workflows' green tick when jobs failed.

Based of off: https://stackoverflow.com/a/58859404

Closes:  https://github.com/vacp2p/nim-libp2p/issues/1197

---------

Co-authored-by: Diego <diego@status.im>
2024-10-10 12:10:49 +00:00
Álex
6c970911f2 fix(CI): free disk space on interop transport job (#1206)
Readd the free disk space job, as space issues still happen when
building images.

---------

Co-authored-by: Diego <diego@status.im>
2024-10-10 09:42:25 +00:00
Álex
5d48776b02 chore(ci): Enable S3 caching for interop (#1193)
- Adds our S3 bucket for caching docker images as Protocol Labs shut
down their shared one.
- Remove the free disk space workaround that prevented the jobs from
failing for using too much space for the images.

---------

Co-authored-by: diegomrsantos <diego@status.im>
2024-09-26 09:56:09 +00:00
Simon-Pierre Vivier
d389d96789 feat: rendezvous refactor (#1183)
Hello!

This PR aim to refactor rendezvous code so that it is easier to impl.
Waku rdv strategy. The hardcoded min and max TTL were out of range with
what we needed and specifying which peers to interact with is also
needed since Waku deals with peers on multiple separate shards.

I tried to keep the changes to a minimum, specifically I did not change
the name of any public procs which result in less than descriptive names
in some cases. I also wanted to return results instead of raising
exceptions but didn't. Would it be acceptable to do so?

Please advise on best practices, thank you.

---------

Co-authored-by: Ludovic Chenut <ludovic@status.im>
2024-09-25 09:11:57 +00:00
Ludovic Chenut
9f90721d12 feat: add proc genUfrag to generate a random username string 2024-06-10 15:57:40 +02:00
Ludovic Chenut
8b9f34959b remove trailing space 2024-04-03 16:36:06 +02:00
Ludovic Chenut
c327762f47 Add comments & remove TODO already done 2024-03-22 14:41:31 +01:00
Ludovic Chenut
abd3653d56 update commit 2024-03-06 16:53:48 +01:00
Ludovic Chenut
afe2b08129 Fix a lot of small bugs 2024-02-15 16:10:20 +01:00
Ludovic Chenut
03ff023e94 fix webrtcstream 2024-02-05 17:44:35 +01:00
Ludovic Chenut
60d48e644b update pinned 2023-12-15 09:54:17 +01:00
Ludovic Chenut
58294ce156 update pinned 2023-12-07 10:00:16 +01:00
Ludovic Chenut
359a448c1b update pinned & fix localCertificate 2023-11-29 14:53:01 +01:00
Ludovic Chenut
7945cc754e add prologue/remote & local cert to the handshake 2023-11-23 15:00:56 +01:00
Ludovic Chenut
284188a74f update pinned 2023-11-16 16:05:41 +01:00
Ludovic Chenut
dab487eeb3 update pinned 2023-11-09 15:56:04 +01:00
Ludovic Chenut
ad43f41ad7 fix hashBtS 2023-11-07 10:27:36 +01:00
Ludovic Chenut
f350479824 update pinned + fixes 2023-10-24 17:20:27 +02:00
Ludovic Chenut
c6460ea7ce fixes pinned + webrtctransport 2023-10-19 12:12:56 +02:00
Tanguy
30e93e7c0a almost compiling 2023-10-13 18:08:09 +02:00
Tanguy
e0f2b00f9a WebRTC scaffolding 2023-10-11 16:18:53 +02:00
Tanguy
6ab779d30a MultiAddress support 2023-10-11 11:35:00 +02:00
10 changed files with 752 additions and 41 deletions

View File

@@ -60,7 +60,6 @@ jobs:
name: '${{ matrix.platform.os }}-${{ matrix.cpu }} (Nim ${{ matrix.nim.branch }})'
runs-on: ${{ matrix.platform.builder }}
continue-on-error: ${{ matrix.nim.branch == 'devel' || matrix.nim.branch == 'version-2-0' }}
steps:
- name: Checkout
uses: actions/checkout@v4

View File

@@ -17,8 +17,9 @@ jobs:
name: Run transport interoperability tests
runs-on: ubuntu-22.04
steps:
- name: Free Disk Space (Ubuntu)
# For some reason the original job (libp2p/test-plans) has enough disk space, but this one doesn't.
- name: Free Disk Space
# For some reason we have space issues while running this action. Likely while building the image.
# This action will free up some space to avoid the issue.
uses: jlumbroso/free-disk-space@v1.3.1
with:
tool-cache: true
@@ -32,6 +33,10 @@ jobs:
with:
test-filter: nim-libp2p-head
extra-versions: ${{ github.workspace }}/tests/transport-interop/version.json
s3-cache-bucket: ${{ vars.S3_LIBP2P_BUILD_CACHE_BUCKET_NAME }}
s3-access-key-id: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_ACCESS_KEY_ID }}
s3-secret-access-key: ${{ secrets.S3_LIBP2P_BUILD_CACHE_AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_REGION }}
run-hole-punching-interop:
name: Run hole-punching interoperability tests
@@ -46,3 +51,7 @@ jobs:
with:
test-filter: nim-libp2p-head
extra-versions: ${{ github.workspace }}/tests/hole-punching-interop/version.json
s3-cache-bucket: ${{ vars.S3_LIBP2P_BUILD_CACHE_BUCKET_NAME }}
s3-access-key-id: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_ACCESS_KEY_ID }}
s3-secret-access-key: ${{ secrets.S3_LIBP2P_BUILD_CACHE_AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_REGION }}

View File

@@ -1,10 +1,12 @@
bearssl;https://github.com/status-im/nim-bearssl@#667b40440a53a58e9f922e29e20818720c62d9ac
binary_serialization;https://github.com/status-im/nim-binary-serialization@#38a73a70fd43f3835ca01a877353858b19e39d70
chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a
chronos;https://github.com/status-im/nim-chronos@#c04576d829b8a0a1b12baaa8bc92037501b3a4a0
dnsclient;https://github.com/ba0f3/dnsclient.nim@#23214235d4784d24aceed99bbfe153379ea557c8
faststreams;https://github.com/status-im/nim-faststreams@#720fc5e5c8e428d9d0af618e1e27c44b42350309
httputils;https://github.com/status-im/nim-http-utils@#3b491a40c60aad9e8d3407443f46f62511e63b18
httputils;https://github.com/status-im/nim-http-utils@#87b7cbf032c90b9e6b446081f4a647e950362cec
json_serialization;https://github.com/status-im/nim-json-serialization@#85b7ea093cb85ee4f433a617b97571bd709d30df
mbedtls;https://github.com/status-im/nim-mbedtls.git@#740fb2f469511adc1772c5cb32395f4076b9e0c5
metrics;https://github.com/status-im/nim-metrics@#6142e433fc8ea9b73379770a788017ac528d46ff
ngtcp2;https://github.com/status-im/nim-ngtcp2@#6834f4756b6af58356ac9c4fef3d71db3c3ae5fe
nimcrypto;https://github.com/cheatfate/nimcrypto@#1c8d6e3caf3abc572136ae9a1da81730c4eb4288
@@ -15,5 +17,7 @@ serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe540499
stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e
testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34
unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c
usrsctp;https://github.com/status-im/nim-usrsctp@#c6a8d4bab44447df790e97dfc8099f7af93d435e
webrtc;https://github.com/status-im/nim-webrtc.git@#497aea7e6c2e73d81456e60f26a49281d0b8c87f
websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982
zlib;https://github.com/status-im/nim-zlib@#38b72eda9d70067df4a953f56b5ed59630f2a17b
zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5

View File

@@ -318,6 +318,33 @@ proc dnsVB(vb: var VBuffer): bool =
## DNS name validateBuffer() implementation.
pathValidateBufferNoSlash(vb)
proc certHashStB(s: string, vb: var VBuffer): bool =
## CertHash address stringToBuffer() implementation.
var data = MultiBase.decode(s).valueOr:
return false
var mh: MultiHash
if MultiHash.decode(data, mh).isOk:
vb.writeSeq(data)
result = true
proc certHashBtS(vb: var VBuffer, s: var string): bool =
## CertHash address bufferToString() implementation.
var address = newSeq[byte]()
if vb.readSeq(address) > 0:
var mh: MultiHash
if MultiHash.decode(address, mh).isOk:
s = MultiBase.encode("base64url", address).valueOr:
return false
result = true
proc certHashVB(vb: var VBuffer): bool =
## CertHash address validateBuffer() implementation.
var address = newSeq[byte]()
if vb.readSeq(address) > 0:
var mh: MultiHash
if MultiHash.decode(address, mh).isOk:
result = true
proc mapEq*(codec: string): MaPattern =
## ``Equal`` operator for pattern
result.operator = Eq
@@ -355,6 +382,11 @@ const
)
TranscoderDNS* =
Transcoder(stringToBuffer: dnsStB, bufferToString: dnsBtS, validateBuffer: dnsVB)
TranscoderCertHash* = Transcoder(
stringToBuffer: certHashStB,
bufferToString: certHashBtS,
validateBuffer: certHashVB
)
ProtocolsList = [
MAProtocol(mcodec: multiCodec("ip4"), kind: Fixed, size: 4, coder: TranscoderIP4),
MAProtocol(mcodec: multiCodec("tcp"), kind: Fixed, size: 2, coder: TranscoderPort),
@@ -393,6 +425,9 @@ const
MAProtocol(mcodec: multiCodec("p2p-websocket-star"), kind: Marker, size: 0),
MAProtocol(mcodec: multiCodec("p2p-webrtc-star"), kind: Marker, size: 0),
MAProtocol(mcodec: multiCodec("p2p-webrtc-direct"), kind: Marker, size: 0),
MAProtocol(mcodec: multiCodec("webrtc"), kind: Marker, size: 0),
MAProtocol(mcodec: multiCodec("webrtc-direct"), kind: Marker, size: 0),
MAProtocol(mcodec: multiCodec("certhash"), kind: Length, size: 0, coder: TranscoderCertHash),
]
DNSANY* = mapEq("dns")
@@ -447,10 +482,12 @@ const
mapAnd(TCP, mapEq("https")), mapAnd(IP, mapEq("https")), mapAnd(DNS, mapEq("https"))
)
WebRTCDirect* = mapOr(
WebRTCDirect* {.deprecated.} = mapOr(
mapAnd(HTTP, mapEq("p2p-webrtc-direct")), mapAnd(HTTPS, mapEq("p2p-webrtc-direct"))
)
WebRTCDirect2* = mapAnd(UDP, mapEq("webrtc-direct"), mapEq("certhash"))
CircuitRelay* = mapEq("p2p-circuit")
proc initMultiAddressCodeTable(): Table[MultiCodec, MAProtocol] {.compileTime.} =

View File

@@ -387,11 +387,14 @@ const MultiCodecList = [
("tls", 0x01C0),
("quic", 0x01CC),
("quic-v1", 0x01CD),
("certhash", 0x01D2),
("ws", 0x01DD),
("wss", 0x01DE),
("p2p-websocket-star", 0x01DF), # not in multicodec list
("p2p-webrtc-star", 0x0113), # not in multicodec list
("p2p-webrtc-direct", 0x0114), # not in multicodec list
("webrtc-direct", 0x0118),
("webrtc", 0x0119),
("onion", 0x01BC),
("onion3", 0x01BD),
("p2p-circuit", 0x0122),

View File

@@ -35,8 +35,6 @@ const
RendezVousCodec* = "/rendezvous/1.0.0"
MinimumDuration* = 2.hours
MaximumDuration = 72.hours
MinimumTTL = MinimumDuration.seconds.uint64
MaximumTTL = MaximumDuration.seconds.uint64
RegistrationLimitPerPeer = 1000
DiscoverLimit = 1000'u64
SemaphoreDefaultSize = 5
@@ -320,6 +318,10 @@ type
peers: seq[PeerId]
cookiesSaved: Table[PeerId, Table[string, seq[byte]]]
switch: Switch
minDuration: Duration
maxDuration: Duration
minTTL: uint64
maxTTL: uint64
proc checkPeerRecord(spr: seq[byte], peerId: PeerId): Result[void, string] =
if spr.len == 0:
@@ -395,7 +397,7 @@ proc save(
rdv.registered.add(
RegisteredData(
peerId: peerId,
expiration: Moment.now() + r.ttl.get(MinimumTTL).int64.seconds,
expiration: Moment.now() + r.ttl.get(rdv.minTTL).int64.seconds,
data: r,
)
)
@@ -409,8 +411,8 @@ proc register(rdv: RendezVous, conn: Connection, r: Register): Future[void] =
libp2p_rendezvous_register.inc()
if r.ns.len notin 1 .. 255:
return conn.sendRegisterResponseError(InvalidNamespace)
let ttl = r.ttl.get(MinimumTTL)
if ttl notin MinimumTTL .. MaximumTTL:
let ttl = r.ttl.get(rdv.minTTL)
if ttl notin rdv.minTTL .. rdv.maxTTL:
return conn.sendRegisterResponseError(InvalidTTL)
let pr = checkPeerRecord(r.signedPeerRecord, conn.peerId)
if pr.isErr():
@@ -506,24 +508,35 @@ proc advertisePeer(rdv: RendezVous, peer: PeerId, msg: seq[byte]) {.async.} =
await rdv.sema.acquire()
discard await advertiseWrap().withTimeout(5.seconds)
method advertise*(
rdv: RendezVous, ns: string, ttl: Duration = MinimumDuration
) {.async, base.} =
let sprBuff = rdv.switch.peerInfo.signedPeerRecord.encode().valueOr:
raise newException(RendezVousError, "Wrong Signed Peer Record")
proc advertise*(
rdv: RendezVous, ns: string, ttl: Duration, peers: seq[PeerId]
) {.async.} =
if ns.len notin 1 .. 255:
raise newException(RendezVousError, "Invalid namespace")
if ttl notin MinimumDuration .. MaximumDuration:
raise newException(RendezVousError, "Invalid time to live")
if ttl notin rdv.minDuration .. rdv.maxDuration:
raise newException(RendezVousError, "Invalid time to live: " & $ttl)
let sprBuff = rdv.switch.peerInfo.signedPeerRecord.encode().valueOr:
raise newException(RendezVousError, "Wrong Signed Peer Record")
let
r = Register(ns: ns, signedPeerRecord: sprBuff, ttl: Opt.some(ttl.seconds.uint64))
msg = encode(Message(msgType: MessageType.Register, register: Opt.some(r)))
rdv.save(ns, rdv.switch.peerInfo.peerId, r)
let fut = collect(newSeq()):
for peer in rdv.peers:
let futs = collect(newSeq()):
for peer in peers:
trace "Send Advertise", peerId = peer, ns
rdv.advertisePeer(peer, msg.buffer)
await allFutures(fut)
await allFutures(futs)
method advertise*(
rdv: RendezVous, ns: string, ttl: Duration = rdv.minDuration
) {.async, base.} =
await rdv.advertise(ns, ttl, rdv.peers)
proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] =
let
@@ -540,9 +553,8 @@ proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] =
@[]
proc request*(
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int, peers: seq[PeerId]
): Future[seq[PeerRecord]] {.async.} =
let nsSalted = ns & rdv.salt
var
s: Table[PeerId, (PeerRecord, Register)]
limit: uint64
@@ -587,8 +599,8 @@ proc request*(
for r in resp.registrations:
if limit == 0:
return
let ttl = r.ttl.get(MaximumTTL + 1)
if ttl > MaximumTTL:
let ttl = r.ttl.get(rdv.maxTTL + 1)
if ttl > rdv.maxTTL:
continue
let
spr = SignedPeerRecord.decode(r.signedPeerRecord).valueOr:
@@ -596,7 +608,7 @@ proc request*(
pr = spr.data
if s.hasKey(pr.peerId):
let (prSaved, rSaved) = s[pr.peerId]
if (prSaved.seqNo == pr.seqNo and rSaved.ttl.get(MaximumTTL) < ttl) or
if (prSaved.seqNo == pr.seqNo and rSaved.ttl.get(rdv.maxTTL) < ttl) or
prSaved.seqNo < pr.seqNo:
s[pr.peerId] = (pr, r)
else:
@@ -605,8 +617,6 @@ proc request*(
for (_, r) in s.values():
rdv.save(ns, peer, r, false)
# copy to avoid resizes during the loop
let peers = rdv.peers
for peer in peers:
if limit == 0:
break
@@ -621,6 +631,11 @@ proc request*(
trace "exception catch in request", description = exc.msg
return toSeq(s.values()).mapIt(it[0])
proc request*(
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int
): Future[seq[PeerRecord]] {.async.} =
await rdv.request(ns, l, rdv.peers)
proc unsubscribeLocally*(rdv: RendezVous, ns: string) =
let nsSalted = ns & rdv.salt
try:
@@ -630,16 +645,15 @@ proc unsubscribeLocally*(rdv: RendezVous, ns: string) =
except KeyError:
return
proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} =
# TODO: find a way to improve this, maybe something similar to the advertise
proc unsubscribe*(rdv: RendezVous, ns: string, peerIds: seq[PeerId]) {.async.} =
if ns.len notin 1 .. 255:
raise newException(RendezVousError, "Invalid namespace")
rdv.unsubscribeLocally(ns)
let msg = encode(
Message(msgType: MessageType.Unregister, unregister: Opt.some(Unregister(ns: ns)))
)
proc unsubscribePeer(rdv: RendezVous, peerId: PeerId) {.async.} =
proc unsubscribePeer(peerId: PeerId) {.async.} =
try:
let conn = await rdv.switch.dial(peerId, RendezVousCodec)
defer:
@@ -648,8 +662,16 @@ proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} =
except CatchableError as exc:
trace "exception while unsubscribing", description = exc.msg
for peer in rdv.peers:
discard await rdv.unsubscribePeer(peer).withTimeout(5.seconds)
let futs = collect(newSeq()):
for peer in peerIds:
unsubscribePeer(peer)
discard await allFutures(futs).withTimeout(5.seconds)
proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} =
rdv.unsubscribeLocally(ns)
await rdv.unsubscribe(ns, rdv.peers)
proc setup*(rdv: RendezVous, switch: Switch) =
rdv.switch = switch
@@ -662,7 +684,25 @@ proc setup*(rdv: RendezVous, switch: Switch) =
rdv.switch.addPeerEventHandler(handlePeer, Joined)
rdv.switch.addPeerEventHandler(handlePeer, Left)
proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T =
proc new*(
T: typedesc[RendezVous],
rng: ref HmacDrbgContext = newRng(),
minDuration = MinimumDuration,
maxDuration = MaximumDuration,
): T {.raises: [RendezVousError].} =
if minDuration < 1.minutes:
raise newException(RendezVousError, "TTL too short: 1 minute minimum")
if maxDuration > 72.hours:
raise newException(RendezVousError, "TTL too long: 72 hours maximum")
if minDuration >= maxDuration:
raise newException(RendezVousError, "Minimum TTL longer than maximum")
let
minTTL = minDuration.seconds.uint64
maxTTL = maxDuration.seconds.uint64
let rdv = T(
rng: rng,
salt: string.fromBytes(generateBytes(rng[], 8)),
@@ -670,6 +710,10 @@ proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T =
defaultDT: Moment.now() - 1.days,
#registerEvent: newAsyncEvent(),
sema: newAsyncSemaphore(SemaphoreDefaultSize),
minDuration: minDuration,
maxDuration: maxDuration,
minTTL: minTTL,
maxTTL: maxTTL,
)
logScope:
topics = "libp2p discovery rendezvous"
@@ -701,9 +745,13 @@ proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T =
return rdv
proc new*(
T: typedesc[RendezVous], switch: Switch, rng: ref HmacDrbgContext = newRng()
T: typedesc[RendezVous],
switch: Switch,
rng: ref HmacDrbgContext = newRng(),
minDuration = MinimumDuration,
maxDuration = MaximumDuration,
): T =
let rdv = T.new(rng)
let rdv = T.new(rng, minDuration, maxDuration)
rdv.setup(switch)
return rdv

View File

@@ -81,7 +81,7 @@ type
localPrivateKey: PrivateKey
localPublicKey: seq[byte]
noiseKeys: KeyPair
commonPrologue: seq[byte]
commonPrologue*: seq[byte]
outgoing: bool
NoiseConnection* = ref object of SecureConn

View File

@@ -0,0 +1,559 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
## WebRtc transport implementation
## For now, only support WebRtc direct (ie browser to server)
{.push raises: [].}
import std/[sequtils, strutils]
import stew/[endians2, byteutils, objects, results]
import chronos, chronicles
import transport,
../errors,
../wire,
../multicodec,
../multihash,
../multibase,
../protobuf/minprotobuf,
../connmanager,
../muxers/muxer,
../multiaddress,
../stream/connection,
../upgrademngrs/upgrade,
../protocols/secure/noise,
../utility
import webrtc/webrtc, webrtc/datachannel, webrtc/dtls/dtls_transport, webrtc/errors
logScope:
topics = "libp2p webrtctransport"
export transport, results
const charset = toSeq("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789/+".items)
proc genUfrag*(rng: ref HmacDrbgContext, size: int): seq[byte] =
# https://github.com/libp2p/specs/blob/master/webrtc/webrtc-direct.md?plain=1#L73-L77
result = newSeq[byte](size)
for resultIndex in 0..<size:
let charsetIndex = rng[].generate(uint) mod charset.len()
result[resultIndex] = charset[charsetIndex].ord().uint8
const
WebRtcTransportTrackerName* = "libp2p.webrtctransport"
MaxMessageSize = 16384 # 16KiB; from the WebRtc-direct spec
# -- Message --
# Implementation of the libp2p's WebRTC message defined here:
# https://github.com/libp2p/specs/blob/master/webrtc/README.md?plain=1#L60-L79
type
MessageFlag = enum
## Flags to support half-closing and reset of streams.
## - Fin: Sender will no longer send messages
## - StopSending: Sender will no longer read messages.
## Received messages are discarded
## - ResetStream: Sender abruptly terminates the sending part of the stream.
## Receiver MAY discard any data that it already received on that stream
## - FinAck: Acknowledges the previous receipt of a message with the Fin flag set.
Fin = 0
StopSending = 1
ResetStream = 2
FinAck = 3
WebRtcMessage = object
flag: Opt[MessageFlag]
data: seq[byte]
proc decode(_: type WebRtcMessage, bytes: seq[byte]): Opt[WebRtcMessage] =
## Decoding WebRTC Message from raw data
var
pb = initProtoBuffer(bytes)
flagOrd: uint32
res: WebRtcMessage
if ? pb.getField(1, flagOrd).toOpt():
var flag: MessageFlag
if flag.checkedEnumAssign(flagOrd):
res.flag = Opt.some(flag)
discard ? pb.getField(2, res.data).toOpt()
Opt.some(res)
proc encode(msg: WebRtcMessage): seq[byte] =
## Encoding WebRTC Message to raw data
var pb = initProtoBuffer()
msg.flag.withValue(val):
pb.write(1, uint32(val))
if msg.data.len > 0:
pb.write(2, msg.data)
pb.finish()
pb.buffer
# -- Raw WebRTC Stream --
# All the data written to or read from a WebRtcStream should be length-prefixed
# so `readOnce`/`write` WebRtcStream implementation must either recode
# `readLP`/`writeLP`, or implement a `RawWebRtcStream` on which we can
# directly use `readLP` and `writeLP`. The second solution is the less redundant,
# so it's the one we've chosen.
type
RawWebRtcStream = ref object of Connection
dataChannel: DataChannelStream
readData: seq[byte]
proc new(_: type RawWebRtcStream, dataChannel: DataChannelStream): RawWebRtcStream =
RawWebRtcStream(dataChannel: dataChannel)
method closeImpl*(s: RawWebRtcStream): Future[void] {.async: (raises: []).} =
# TODO: close datachannel
discard
method write*(s: RawWebRtcStream, msg: seq[byte]): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
trace "RawWebrtcStream write", msg, len=msg.len()
try:
await s.dataChannel.write(msg)
except WebRtcError as exc:
raise newException(LPStreamError, exc.msg, exc)
method readOnce*(s: RawWebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
# TODO:
# if s.isClosed:
# raise newLPStreamEOFError()
if s.readData.len() == 0:
try:
let rawData = await s.dataChannel.read()
s.readData = rawData
except WebRtcError as exc:
raise newException(LPStreamError, exc.msg, exc)
trace "readOnce RawWebRtcStream", data = s.readData, nbytes
result = min(nbytes, s.readData.len)
copyMem(pbytes, addr s.readData[0], result)
s.readData = s.readData[result..^1]
# -- Stream --
type
WebRtcState = enum
Sending, Closing, Closed
WebRtcStream = ref object of Connection
rawStream: RawWebRtcStream
sendQueue: seq[(seq[byte], Future[void].Raising([CancelledError, LPStreamError]))]
sendLoop: Future[void]
readData: seq[byte]
txState: WebRtcState # Transmission
rxState: WebRtcState # Reception
proc new(
_: type WebRtcStream,
dataChannel: DataChannelStream,
oaddr: Opt[MultiAddress],
peerId: PeerId): WebRtcStream =
let stream = WebRtcStream(rawStream: RawWebRtcStream.new(dataChannel),
observedAddr: oaddr, peerId: peerId)
procCall Connection(stream).initStream()
stream
proc sender(s: WebRtcStream) {.async: (raises: [CancelledError, LPStreamError]).} =
while s.sendQueue.len > 0:
let (message, fut) = s.sendQueue.pop()
#TODO handle exceptions
await s.rawStream.writeLp(message)
if not fut.isNil: fut.complete()
proc send(s: WebRtcStream, msg: WebRtcMessage, fut: Future[void].Raising([CancelledError, LPStreamError]) = nil) =
let wrappedMessage = msg.encode()
s.sendQueue.insert((wrappedMessage, fut))
if s.sendLoop == nil or s.sendLoop.finished:
s.sendLoop = s.sender()
method write*(s: WebRtcStream, msg2: seq[byte]): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
# We need to make sure we send all of our data before another write
# Otherwise, two concurrent writes could get intertwined
# We avoid this by filling the s.sendQueue synchronously
var msg = msg2
trace "WebrtcStream write", msg, len=msg.len()
var retFuture = Future[void].Raising([CancelledError, LPStreamError]).init("WebRtcStream.write")
if s.txState != Sending:
raise newException(LPStreamClosedError, "whatever")
var messages: seq[seq[byte]]
while msg.len > MaxMessageSize - 16:
let
endOfMessage = MaxMessageSize - 16
wrappedMessage = WebRtcMessage(data: msg[0 ..< endOfMessage])
s.send(wrappedMessage)
msg = msg[endOfMessage .. ^1]
let
wrappedMessage = WebRtcMessage(data: msg)
s.send(wrappedMessage, retFuture)
await retFuture
proc actuallyClose(s: WebRtcStream) {.async: (raises: [CancelledError, LPStreamError]).} =
debug "stream closed", rxState=s.rxState, txState=s.txState
if s.rxState == Closed and s.txState == Closed and s.readData.len == 0:
#TODO add support to DataChannel
#await s.dataChannel.close()
await procCall Connection(s).closeImpl()
method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
if s.rxState == Closed:
raise newLPStreamEOFError()
while s.readData.len == 0 or nbytes == 0:
# Check if there's no data left in readData or if nbytes is equal to 0
# in order to read an eventual Fin or FinAck
if s.rxState == Closed:
await s.actuallyClose()
return 0
try:
let
#TODO handle exceptions
message = await s.rawStream.readLp(MaxMessageSize)
decoded = WebRtcMessage.decode(message).tryGet()
s.readData = s.readData.concat(decoded.data)
decoded.flag.withValue(flag):
case flag:
of Fin:
# Peer won't send any more data
s.rxState = Closed
s.send(WebRtcMessage(flag: Opt.some(FinAck)))
of FinAck:
s.txState = Closed
await s.actuallyClose()
if nbytes == 0:
return 0
else: discard
except CatchableError as exc:
raise newException(LPStreamError, exc.msg, exc)
result = min(nbytes, s.readData.len)
copyMem(pbytes, addr s.readData[0], result)
s.readData = s.readData[result..^1]
method closeImpl*(s: WebRtcStream) {.async: (raises: []).} =
s.send(WebRtcMessage(flag: Opt.some(Fin)))
s.txState = Closing
while s.txState != Closed:
try:
discard await s.readOnce(nil, 0)
except CatchableError as exc:
discard
except CancelledError as exc:
discard
# -- Connection --
type WebRtcConnection = ref object of Connection
connection: DataChannelConnection
remoteAddress: MultiAddress
method close*(conn: WebRtcConnection) {.async: (raises: []).} =
#TODO
discard
proc new(
_: type WebRtcConnection,
conn: DataChannelConnection,
observedAddr: Opt[MultiAddress]
): WebRtcConnection =
let co = WebRtcConnection(connection: conn, observedAddr: observedAddr)
procCall Connection(co).initStream()
co
proc getStream*(conn: WebRtcConnection,
direction: Direction,
noiseHandshake: bool = false): Future[WebRtcStream] {.async: (raises: [CancelledError, LPStreamError]).} =
var datachannel =
case direction:
of Direction.In:
await conn.connection.accept()
of Direction.Out:
try:
await conn.connection.openStream(noiseHandshake)
except WebRtcError as exc:
raise newException(LPStreamError, exc.msg, exc)
return WebRtcStream.new(datachannel, conn.observedAddr, conn.peerId)
# -- Muxer --
type WebRtcMuxer = ref object of Muxer
webRtcConn: WebRtcConnection
handleFut: Future[void]
method newStream*(m: WebRtcMuxer, name: string = "", lazy: bool = false): Future[Connection] {.async: (raises: [CancelledError, LPStreamError, MuxerError]).} =
return await m.webRtcConn.getStream(Direction.Out)
proc handleStream(m: WebRtcMuxer, chann: WebRtcStream) {.async.} =
try:
await m.streamHandler(chann)
trace "finished handling stream"
doAssert(chann.closed, "connection not closed by handler!")
except CatchableError as exc:
trace "Exception in mplex stream handler", msg = exc.msg
await chann.close()
#TODO add atEof
method handle*(m: WebRtcMuxer): Future[void] {.async: (raises: []).} =
try:
#while not m.webRtcConn.atEof:
while true:
let incomingStream = await m.webRtcConn.getStream(Direction.In)
asyncSpawn m.handleStream(incomingStream)
except CatchableError as exc:
discard
except CancelledError as exc:
discard
finally:
await m.webRtcConn.close()
method close*(m: WebRtcMuxer) {.async: (raises: []).} =
m.handleFut.cancel()
await m.webRtcConn.close()
# -- Upgrader --
type
WebRtcStreamHandler = proc(conn: Connection): Future[void] {.async: (raises: []).}
WebRtcUpgrade = ref object of Upgrade
streamHandler: WebRtcStreamHandler
method upgrade*(
self: WebRtcUpgrade,
conn: Connection,
peerId: Opt[PeerId]): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} =
let webRtcConn = WebRtcConnection(conn)
result = WebRtcMuxer(connection: conn, webRtcConn: webRtcConn)
# Noise handshake
let noiseHandler = self.secureManagers.filterIt(it of Noise)
assert noiseHandler.len > 0
let xx = "libp2p-webrtc-noise:".toBytes()
let localCert = MultiHash.digest("sha2-256", webRtcConn.connection.localCertificate()).get().data.buffer
let remoteCert = MultiHash.digest("sha2-256", webRtcConn.connection.remoteCertificate()).get().data.buffer
((Noise)noiseHandler[0]).commonPrologue = xx & remoteCert & localCert
echo "=> ", ((Noise)noiseHandler[0]).commonPrologue
let
stream = await webRtcConn.getStream(Out, true)
secureStream = await noiseHandler[0].handshake(
stream,
initiator = true, # we are always the initiator in webrtc-direct
peerId = peerId
)
# Peer proved its identity, we can close this
await secureStream.close()
await stream.close()
result.streamHandler = self.streamHandler
result.handler = result.handle()
# -- Transport --
type
WebRtcTransport* = ref object of Transport
connectionsTimeout: Duration
servers: seq[WebRtc]
acceptFuts: seq[Future[DataChannelConnection].Raising([CancelledError, WebRtcError])]
clients: array[Direction, seq[DataChannelConnection]]
WebRtcTransportTracker* = ref object of TrackerBase
opened*: uint64
closed*: uint64
WebRtcTransportError* = object of transport.TransportError
proc setupWebRtcTransportTracker(): WebRtcTransportTracker {.gcsafe, raises: [].}
proc getWebRtcTransportTracker(): WebRtcTransportTracker {.gcsafe.} =
result = cast[WebRtcTransportTracker](getTracker(WebRtcTransportTrackerName))
if isNil(result):
result = setupWebRtcTransportTracker()
proc dumpTracking(): string {.gcsafe.} =
var tracker = getWebRtcTransportTracker()
result = "Opened tcp transports: " & $tracker.opened & "\n" &
"Closed tcp transports: " & $tracker.closed
proc leakTransport(): bool {.gcsafe.} =
var tracker = getWebRtcTransportTracker()
result = (tracker.opened != tracker.closed)
proc setupWebRtcTransportTracker(): WebRtcTransportTracker =
result = new WebRtcTransportTracker
result.opened = 0
result.closed = 0
result.dump = dumpTracking
result.isLeaked = leakTransport
addTracker(WebRtcTransportTrackerName, result)
proc new*(
T: typedesc[WebRtcTransport],
upgrade: Upgrade,
connectionsTimeout = 10.minutes): T {.public.} =
let upgrader = WebRtcUpgrade(ms: upgrade.ms, secureManagers: upgrade.secureManagers)
upgrader.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
# TODO: replace echo by trace and find why it fails compiling
echo "Starting stream handler"#, conn
try:
await upgrader.ms.handle(conn) # handle incoming connection
except CancelledError as exc:
echo "Stream handler cancelled"
except CatchableError as exc:
echo "Exception in stream handler", exc.msg
finally:
await conn.closeWithEOF()
echo "Stream handler done"#, conn
let
transport = T(
upgrader: upgrader,
connectionsTimeout: connectionsTimeout)
return transport
proc stunpwprovider(usernameBytes: seq[byte]): seq[byte] =
let username = string.fromBytes(usernameBytes)
let usersplit = username.split(":")
if usersplit.len() <= 2 and usersplit[0].startsWith("libp2p+webrtc+v1/"):
return toBytes(usersplit[0])
else:
return @[]
method start*(
self: WebRtcTransport,
addrs: seq[MultiAddress]) {.async.} =
## listen on the transport
##
if self.running:
warn "WebRtc transport already running"
return
await procCall Transport(self).start(addrs)
trace "Starting WebRtc transport"
inc getWebRtcTransportTracker().opened
for i, ma in addrs:
if not self.handles(ma):
trace "Invalid address detected, skipping!", address = ma
continue
let
transportAddress = initTAddress(ma[0..1].tryGet()).tryGet()
server = WebRtc.new(transportAddress, passwordProvider = stunpwprovider)
server.listen()
self.servers &= server
let
cert = server.localCertificate()
certHash = MultiHash.digest("sha2-256", cert).get().data.buffer
encodedCertHash = MultiBase.encode("base64", certHash).get()
self.addrs[i] = MultiAddress.init(server.localAddress(), IPPROTO_UDP).tryGet() &
MultiAddress.init(multiCodec("webrtc-direct")).tryGet() &
MultiAddress.init(multiCodec("certhash"), certHash).tryGet()
trace "Listening on", address = self.addrs[i]
proc connHandler(
self: WebRtcTransport,
client: DataChannelConnection,
observedAddr: Opt[MultiAddress],
dir: Direction
): Future[Connection] {.async.} =
trace "Handling webrtc connection", address = $observedAddr, dir = $dir,
clients = self.clients[Direction.In].len + self.clients[Direction.Out].len
let conn: Connection =
WebRtcConnection.new(
conn = client,
# dir = dir,
observedAddr = observedAddr
# timeout = self.connectionsTimeout
)
proc onClose() {.async.} =
try:
let futs = @[conn.join(), conn.join()] #TODO that's stupid
await futs[0] or futs[1]
for f in futs:
if not f.finished: await f.cancelAndWait() # cancel outstanding join()
trace "Cleaning up client"# TODO ?: , addrs = $client.remoteAddress,
# conn
self.clients[dir].keepItIf( it != client )
#TODO
#await allFuturesThrowing(
# conn.close(), client.closeWait())
except CatchableError as exc:
let useExc {.used.} = exc
debug "Error cleaning up client", errMsg = exc.msg, conn
self.clients[dir].add(client)
asyncSpawn onClose()
return conn
method accept*(self: WebRtcTransport): Future[Connection] {.async, gcsafe.} =
if not self.running:
raise newTransportClosedError()
#TODO handle errors
if self.acceptFuts.len <= 0:
self.acceptFuts = self.servers.mapIt(it.accept())
if self.acceptFuts.len <= 0:
return
let
finished = await one(self.acceptFuts)
index = self.acceptFuts.find(finished)
self.acceptFuts[index] = self.servers[index].accept()
trace "Accept WebRTC Transport"
let transp = await finished
try:
#TODO add remoteAddress to DataChannelConnection
#let observedAddr = MultiAddress.init(transp.remoteAddress).tryGet() #TODO add /webrtc-direct
let observedAddr = MultiAddress.init("/ip4/127.0.0.1").tryGet()
return await self.connHandler(transp, Opt.some(observedAddr), Direction.In)
except CancelledError as exc:
#TODO
#transp.close()
raise exc
except CatchableError as exc:
debug "Failed to handle connection", exc = exc.msg
#TODO
#transp.close()
method handles*(t: WebRtcTransport, address: MultiAddress): bool {.gcsafe.} =
if procCall Transport(t).handles(address):
if address.protocols.isOk:
return WebRTCDirect2.match(address)

View File

@@ -126,7 +126,7 @@ suite "RendezVous":
asyncTest "Various local error":
let
rdv = RendezVous.new()
rdv = RendezVous.new(minDuration = 1.minutes, maxDuration = 72.hours)
switch = createSwitch(rdv)
expect RendezVousError:
discard await rdv.request("A".repeat(300))
@@ -137,6 +137,14 @@ suite "RendezVous":
expect RendezVousError:
await rdv.advertise("A".repeat(300))
expect RendezVousError:
await rdv.advertise("A", 2.weeks)
await rdv.advertise("A", 73.hours)
expect RendezVousError:
await rdv.advertise("A", 5.minutes)
await rdv.advertise("A", 30.seconds)
test "Various config error":
expect RendezVousError:
discard RendezVous.new(minDuration = 30.seconds)
expect RendezVousError:
discard RendezVous.new(maxDuration = 73.hours)
expect RendezVousError:
discard RendezVous.new(minDuration = 15.minutes, maxDuration = 10.minutes)

44
testwebrtc.nim Normal file
View File

@@ -0,0 +1,44 @@
import chronos, libp2p, libp2p/transports/webrtctransport
import stew/byteutils
proc echoHandler(conn: Connection, proto: string) {.async.} =
defer: await conn.close()
while true:
try:
echo "\e[35;1m => Echo Handler <=\e[0m"
var xx = newSeq[byte](1024)
let aa = await conn.readOnce(addr xx[0], 1024)
xx = xx[0..<aa]
let msg = string.fromBytes(xx)
echo " => Echo Handler Receive: ", msg, " <="
echo " => Echo Handler Try Send: ", msg & "1", " <="
await conn.write(msg & "1")
except CatchableError as e:
echo " => Echo Handler Error: ", e.msg, " <="
break
proc main {.async.} =
let ma = MultiAddress.init("/ip4/127.0.0.1/udp/4242/webrtc-direct/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g")
echo ma
let switch =
SwitchBuilder.new()
.withAddress(ma.tryGet()) #TODO the certhash shouldn't be necessary
.withRng(crypto.newRng())
.withMplex()
.withYamux()
.withTransport(proc (upgr: Upgrade): Transport = WebRtcTransport.new(upgr))
.withNoise()
.build()
let
codec = "/echo/1.0.0"
proto = new LPProtocol
proto.handler = echoHandler
proto.codec = codec
switch.mount(proto)
await switch.start()
echo "\e[31;1m", $(switch.peerInfo.addrs[0]), "/p2p/", $(switch.peerInfo.peerId), "\e[0m"
await sleepAsync(1.hours)
waitFor main()