mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 12:58:05 -05:00
Compare commits
26 Commits
v1.6.0
...
webrtc-dir
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6df4a53ec3 | ||
|
|
255bf740ea | ||
|
|
5af460d7fd | ||
|
|
b0ce2a428f | ||
|
|
a3b8729cbe | ||
|
|
6c970911f2 | ||
|
|
5d48776b02 | ||
|
|
d389d96789 | ||
|
|
9f90721d12 | ||
|
|
8b9f34959b | ||
|
|
c327762f47 | ||
|
|
abd3653d56 | ||
|
|
afe2b08129 | ||
|
|
03ff023e94 | ||
|
|
60d48e644b | ||
|
|
58294ce156 | ||
|
|
359a448c1b | ||
|
|
7945cc754e | ||
|
|
284188a74f | ||
|
|
dab487eeb3 | ||
|
|
ad43f41ad7 | ||
|
|
f350479824 | ||
|
|
c6460ea7ce | ||
|
|
30e93e7c0a | ||
|
|
e0f2b00f9a | ||
|
|
6ab779d30a |
1
.github/workflows/daily_common.yml
vendored
1
.github/workflows/daily_common.yml
vendored
@@ -60,7 +60,6 @@ jobs:
|
||||
|
||||
name: '${{ matrix.platform.os }}-${{ matrix.cpu }} (Nim ${{ matrix.nim.branch }})'
|
||||
runs-on: ${{ matrix.platform.builder }}
|
||||
continue-on-error: ${{ matrix.nim.branch == 'devel' || matrix.nim.branch == 'version-2-0' }}
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
13
.github/workflows/interop.yml
vendored
13
.github/workflows/interop.yml
vendored
@@ -17,8 +17,9 @@ jobs:
|
||||
name: Run transport interoperability tests
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- name: Free Disk Space (Ubuntu)
|
||||
# For some reason the original job (libp2p/test-plans) has enough disk space, but this one doesn't.
|
||||
- name: Free Disk Space
|
||||
# For some reason we have space issues while running this action. Likely while building the image.
|
||||
# This action will free up some space to avoid the issue.
|
||||
uses: jlumbroso/free-disk-space@v1.3.1
|
||||
with:
|
||||
tool-cache: true
|
||||
@@ -32,6 +33,10 @@ jobs:
|
||||
with:
|
||||
test-filter: nim-libp2p-head
|
||||
extra-versions: ${{ github.workspace }}/tests/transport-interop/version.json
|
||||
s3-cache-bucket: ${{ vars.S3_LIBP2P_BUILD_CACHE_BUCKET_NAME }}
|
||||
s3-access-key-id: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_ACCESS_KEY_ID }}
|
||||
s3-secret-access-key: ${{ secrets.S3_LIBP2P_BUILD_CACHE_AWS_SECRET_ACCESS_KEY }}
|
||||
aws-region: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_REGION }}
|
||||
|
||||
run-hole-punching-interop:
|
||||
name: Run hole-punching interoperability tests
|
||||
@@ -46,3 +51,7 @@ jobs:
|
||||
with:
|
||||
test-filter: nim-libp2p-head
|
||||
extra-versions: ${{ github.workspace }}/tests/hole-punching-interop/version.json
|
||||
s3-cache-bucket: ${{ vars.S3_LIBP2P_BUILD_CACHE_BUCKET_NAME }}
|
||||
s3-access-key-id: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_ACCESS_KEY_ID }}
|
||||
s3-secret-access-key: ${{ secrets.S3_LIBP2P_BUILD_CACHE_AWS_SECRET_ACCESS_KEY }}
|
||||
aws-region: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_REGION }}
|
||||
|
||||
8
.pinned
8
.pinned
@@ -1,10 +1,12 @@
|
||||
bearssl;https://github.com/status-im/nim-bearssl@#667b40440a53a58e9f922e29e20818720c62d9ac
|
||||
binary_serialization;https://github.com/status-im/nim-binary-serialization@#38a73a70fd43f3835ca01a877353858b19e39d70
|
||||
chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a
|
||||
chronos;https://github.com/status-im/nim-chronos@#c04576d829b8a0a1b12baaa8bc92037501b3a4a0
|
||||
dnsclient;https://github.com/ba0f3/dnsclient.nim@#23214235d4784d24aceed99bbfe153379ea557c8
|
||||
faststreams;https://github.com/status-im/nim-faststreams@#720fc5e5c8e428d9d0af618e1e27c44b42350309
|
||||
httputils;https://github.com/status-im/nim-http-utils@#3b491a40c60aad9e8d3407443f46f62511e63b18
|
||||
httputils;https://github.com/status-im/nim-http-utils@#87b7cbf032c90b9e6b446081f4a647e950362cec
|
||||
json_serialization;https://github.com/status-im/nim-json-serialization@#85b7ea093cb85ee4f433a617b97571bd709d30df
|
||||
mbedtls;https://github.com/status-im/nim-mbedtls.git@#740fb2f469511adc1772c5cb32395f4076b9e0c5
|
||||
metrics;https://github.com/status-im/nim-metrics@#6142e433fc8ea9b73379770a788017ac528d46ff
|
||||
ngtcp2;https://github.com/status-im/nim-ngtcp2@#6834f4756b6af58356ac9c4fef3d71db3c3ae5fe
|
||||
nimcrypto;https://github.com/cheatfate/nimcrypto@#1c8d6e3caf3abc572136ae9a1da81730c4eb4288
|
||||
@@ -15,5 +17,7 @@ serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe540499
|
||||
stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e
|
||||
testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34
|
||||
unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c
|
||||
usrsctp;https://github.com/status-im/nim-usrsctp@#c6a8d4bab44447df790e97dfc8099f7af93d435e
|
||||
webrtc;https://github.com/status-im/nim-webrtc.git@#497aea7e6c2e73d81456e60f26a49281d0b8c87f
|
||||
websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982
|
||||
zlib;https://github.com/status-im/nim-zlib@#38b72eda9d70067df4a953f56b5ed59630f2a17b
|
||||
zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5
|
||||
|
||||
@@ -318,6 +318,33 @@ proc dnsVB(vb: var VBuffer): bool =
|
||||
## DNS name validateBuffer() implementation.
|
||||
pathValidateBufferNoSlash(vb)
|
||||
|
||||
proc certHashStB(s: string, vb: var VBuffer): bool =
|
||||
## CertHash address stringToBuffer() implementation.
|
||||
var data = MultiBase.decode(s).valueOr:
|
||||
return false
|
||||
var mh: MultiHash
|
||||
if MultiHash.decode(data, mh).isOk:
|
||||
vb.writeSeq(data)
|
||||
result = true
|
||||
|
||||
proc certHashBtS(vb: var VBuffer, s: var string): bool =
|
||||
## CertHash address bufferToString() implementation.
|
||||
var address = newSeq[byte]()
|
||||
if vb.readSeq(address) > 0:
|
||||
var mh: MultiHash
|
||||
if MultiHash.decode(address, mh).isOk:
|
||||
s = MultiBase.encode("base64url", address).valueOr:
|
||||
return false
|
||||
result = true
|
||||
|
||||
proc certHashVB(vb: var VBuffer): bool =
|
||||
## CertHash address validateBuffer() implementation.
|
||||
var address = newSeq[byte]()
|
||||
if vb.readSeq(address) > 0:
|
||||
var mh: MultiHash
|
||||
if MultiHash.decode(address, mh).isOk:
|
||||
result = true
|
||||
|
||||
proc mapEq*(codec: string): MaPattern =
|
||||
## ``Equal`` operator for pattern
|
||||
result.operator = Eq
|
||||
@@ -355,6 +382,11 @@ const
|
||||
)
|
||||
TranscoderDNS* =
|
||||
Transcoder(stringToBuffer: dnsStB, bufferToString: dnsBtS, validateBuffer: dnsVB)
|
||||
TranscoderCertHash* = Transcoder(
|
||||
stringToBuffer: certHashStB,
|
||||
bufferToString: certHashBtS,
|
||||
validateBuffer: certHashVB
|
||||
)
|
||||
ProtocolsList = [
|
||||
MAProtocol(mcodec: multiCodec("ip4"), kind: Fixed, size: 4, coder: TranscoderIP4),
|
||||
MAProtocol(mcodec: multiCodec("tcp"), kind: Fixed, size: 2, coder: TranscoderPort),
|
||||
@@ -393,6 +425,9 @@ const
|
||||
MAProtocol(mcodec: multiCodec("p2p-websocket-star"), kind: Marker, size: 0),
|
||||
MAProtocol(mcodec: multiCodec("p2p-webrtc-star"), kind: Marker, size: 0),
|
||||
MAProtocol(mcodec: multiCodec("p2p-webrtc-direct"), kind: Marker, size: 0),
|
||||
MAProtocol(mcodec: multiCodec("webrtc"), kind: Marker, size: 0),
|
||||
MAProtocol(mcodec: multiCodec("webrtc-direct"), kind: Marker, size: 0),
|
||||
MAProtocol(mcodec: multiCodec("certhash"), kind: Length, size: 0, coder: TranscoderCertHash),
|
||||
]
|
||||
|
||||
DNSANY* = mapEq("dns")
|
||||
@@ -447,10 +482,12 @@ const
|
||||
mapAnd(TCP, mapEq("https")), mapAnd(IP, mapEq("https")), mapAnd(DNS, mapEq("https"))
|
||||
)
|
||||
|
||||
WebRTCDirect* = mapOr(
|
||||
WebRTCDirect* {.deprecated.} = mapOr(
|
||||
mapAnd(HTTP, mapEq("p2p-webrtc-direct")), mapAnd(HTTPS, mapEq("p2p-webrtc-direct"))
|
||||
)
|
||||
|
||||
WebRTCDirect2* = mapAnd(UDP, mapEq("webrtc-direct"), mapEq("certhash"))
|
||||
|
||||
CircuitRelay* = mapEq("p2p-circuit")
|
||||
|
||||
proc initMultiAddressCodeTable(): Table[MultiCodec, MAProtocol] {.compileTime.} =
|
||||
|
||||
@@ -387,11 +387,14 @@ const MultiCodecList = [
|
||||
("tls", 0x01C0),
|
||||
("quic", 0x01CC),
|
||||
("quic-v1", 0x01CD),
|
||||
("certhash", 0x01D2),
|
||||
("ws", 0x01DD),
|
||||
("wss", 0x01DE),
|
||||
("p2p-websocket-star", 0x01DF), # not in multicodec list
|
||||
("p2p-webrtc-star", 0x0113), # not in multicodec list
|
||||
("p2p-webrtc-direct", 0x0114), # not in multicodec list
|
||||
("webrtc-direct", 0x0118),
|
||||
("webrtc", 0x0119),
|
||||
("onion", 0x01BC),
|
||||
("onion3", 0x01BD),
|
||||
("p2p-circuit", 0x0122),
|
||||
|
||||
@@ -35,8 +35,6 @@ const
|
||||
RendezVousCodec* = "/rendezvous/1.0.0"
|
||||
MinimumDuration* = 2.hours
|
||||
MaximumDuration = 72.hours
|
||||
MinimumTTL = MinimumDuration.seconds.uint64
|
||||
MaximumTTL = MaximumDuration.seconds.uint64
|
||||
RegistrationLimitPerPeer = 1000
|
||||
DiscoverLimit = 1000'u64
|
||||
SemaphoreDefaultSize = 5
|
||||
@@ -320,6 +318,10 @@ type
|
||||
peers: seq[PeerId]
|
||||
cookiesSaved: Table[PeerId, Table[string, seq[byte]]]
|
||||
switch: Switch
|
||||
minDuration: Duration
|
||||
maxDuration: Duration
|
||||
minTTL: uint64
|
||||
maxTTL: uint64
|
||||
|
||||
proc checkPeerRecord(spr: seq[byte], peerId: PeerId): Result[void, string] =
|
||||
if spr.len == 0:
|
||||
@@ -395,7 +397,7 @@ proc save(
|
||||
rdv.registered.add(
|
||||
RegisteredData(
|
||||
peerId: peerId,
|
||||
expiration: Moment.now() + r.ttl.get(MinimumTTL).int64.seconds,
|
||||
expiration: Moment.now() + r.ttl.get(rdv.minTTL).int64.seconds,
|
||||
data: r,
|
||||
)
|
||||
)
|
||||
@@ -409,8 +411,8 @@ proc register(rdv: RendezVous, conn: Connection, r: Register): Future[void] =
|
||||
libp2p_rendezvous_register.inc()
|
||||
if r.ns.len notin 1 .. 255:
|
||||
return conn.sendRegisterResponseError(InvalidNamespace)
|
||||
let ttl = r.ttl.get(MinimumTTL)
|
||||
if ttl notin MinimumTTL .. MaximumTTL:
|
||||
let ttl = r.ttl.get(rdv.minTTL)
|
||||
if ttl notin rdv.minTTL .. rdv.maxTTL:
|
||||
return conn.sendRegisterResponseError(InvalidTTL)
|
||||
let pr = checkPeerRecord(r.signedPeerRecord, conn.peerId)
|
||||
if pr.isErr():
|
||||
@@ -506,24 +508,35 @@ proc advertisePeer(rdv: RendezVous, peer: PeerId, msg: seq[byte]) {.async.} =
|
||||
await rdv.sema.acquire()
|
||||
discard await advertiseWrap().withTimeout(5.seconds)
|
||||
|
||||
method advertise*(
|
||||
rdv: RendezVous, ns: string, ttl: Duration = MinimumDuration
|
||||
) {.async, base.} =
|
||||
let sprBuff = rdv.switch.peerInfo.signedPeerRecord.encode().valueOr:
|
||||
raise newException(RendezVousError, "Wrong Signed Peer Record")
|
||||
proc advertise*(
|
||||
rdv: RendezVous, ns: string, ttl: Duration, peers: seq[PeerId]
|
||||
) {.async.} =
|
||||
if ns.len notin 1 .. 255:
|
||||
raise newException(RendezVousError, "Invalid namespace")
|
||||
if ttl notin MinimumDuration .. MaximumDuration:
|
||||
raise newException(RendezVousError, "Invalid time to live")
|
||||
|
||||
if ttl notin rdv.minDuration .. rdv.maxDuration:
|
||||
raise newException(RendezVousError, "Invalid time to live: " & $ttl)
|
||||
|
||||
let sprBuff = rdv.switch.peerInfo.signedPeerRecord.encode().valueOr:
|
||||
raise newException(RendezVousError, "Wrong Signed Peer Record")
|
||||
|
||||
let
|
||||
r = Register(ns: ns, signedPeerRecord: sprBuff, ttl: Opt.some(ttl.seconds.uint64))
|
||||
msg = encode(Message(msgType: MessageType.Register, register: Opt.some(r)))
|
||||
|
||||
rdv.save(ns, rdv.switch.peerInfo.peerId, r)
|
||||
let fut = collect(newSeq()):
|
||||
for peer in rdv.peers:
|
||||
|
||||
let futs = collect(newSeq()):
|
||||
for peer in peers:
|
||||
trace "Send Advertise", peerId = peer, ns
|
||||
rdv.advertisePeer(peer, msg.buffer)
|
||||
await allFutures(fut)
|
||||
|
||||
await allFutures(futs)
|
||||
|
||||
method advertise*(
|
||||
rdv: RendezVous, ns: string, ttl: Duration = rdv.minDuration
|
||||
) {.async, base.} =
|
||||
await rdv.advertise(ns, ttl, rdv.peers)
|
||||
|
||||
proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] =
|
||||
let
|
||||
@@ -540,9 +553,8 @@ proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] =
|
||||
@[]
|
||||
|
||||
proc request*(
|
||||
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int
|
||||
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int, peers: seq[PeerId]
|
||||
): Future[seq[PeerRecord]] {.async.} =
|
||||
let nsSalted = ns & rdv.salt
|
||||
var
|
||||
s: Table[PeerId, (PeerRecord, Register)]
|
||||
limit: uint64
|
||||
@@ -587,8 +599,8 @@ proc request*(
|
||||
for r in resp.registrations:
|
||||
if limit == 0:
|
||||
return
|
||||
let ttl = r.ttl.get(MaximumTTL + 1)
|
||||
if ttl > MaximumTTL:
|
||||
let ttl = r.ttl.get(rdv.maxTTL + 1)
|
||||
if ttl > rdv.maxTTL:
|
||||
continue
|
||||
let
|
||||
spr = SignedPeerRecord.decode(r.signedPeerRecord).valueOr:
|
||||
@@ -596,7 +608,7 @@ proc request*(
|
||||
pr = spr.data
|
||||
if s.hasKey(pr.peerId):
|
||||
let (prSaved, rSaved) = s[pr.peerId]
|
||||
if (prSaved.seqNo == pr.seqNo and rSaved.ttl.get(MaximumTTL) < ttl) or
|
||||
if (prSaved.seqNo == pr.seqNo and rSaved.ttl.get(rdv.maxTTL) < ttl) or
|
||||
prSaved.seqNo < pr.seqNo:
|
||||
s[pr.peerId] = (pr, r)
|
||||
else:
|
||||
@@ -605,8 +617,6 @@ proc request*(
|
||||
for (_, r) in s.values():
|
||||
rdv.save(ns, peer, r, false)
|
||||
|
||||
# copy to avoid resizes during the loop
|
||||
let peers = rdv.peers
|
||||
for peer in peers:
|
||||
if limit == 0:
|
||||
break
|
||||
@@ -621,6 +631,11 @@ proc request*(
|
||||
trace "exception catch in request", description = exc.msg
|
||||
return toSeq(s.values()).mapIt(it[0])
|
||||
|
||||
proc request*(
|
||||
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int
|
||||
): Future[seq[PeerRecord]] {.async.} =
|
||||
await rdv.request(ns, l, rdv.peers)
|
||||
|
||||
proc unsubscribeLocally*(rdv: RendezVous, ns: string) =
|
||||
let nsSalted = ns & rdv.salt
|
||||
try:
|
||||
@@ -630,16 +645,15 @@ proc unsubscribeLocally*(rdv: RendezVous, ns: string) =
|
||||
except KeyError:
|
||||
return
|
||||
|
||||
proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} =
|
||||
# TODO: find a way to improve this, maybe something similar to the advertise
|
||||
proc unsubscribe*(rdv: RendezVous, ns: string, peerIds: seq[PeerId]) {.async.} =
|
||||
if ns.len notin 1 .. 255:
|
||||
raise newException(RendezVousError, "Invalid namespace")
|
||||
rdv.unsubscribeLocally(ns)
|
||||
|
||||
let msg = encode(
|
||||
Message(msgType: MessageType.Unregister, unregister: Opt.some(Unregister(ns: ns)))
|
||||
)
|
||||
|
||||
proc unsubscribePeer(rdv: RendezVous, peerId: PeerId) {.async.} =
|
||||
proc unsubscribePeer(peerId: PeerId) {.async.} =
|
||||
try:
|
||||
let conn = await rdv.switch.dial(peerId, RendezVousCodec)
|
||||
defer:
|
||||
@@ -648,8 +662,16 @@ proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} =
|
||||
except CatchableError as exc:
|
||||
trace "exception while unsubscribing", description = exc.msg
|
||||
|
||||
for peer in rdv.peers:
|
||||
discard await rdv.unsubscribePeer(peer).withTimeout(5.seconds)
|
||||
let futs = collect(newSeq()):
|
||||
for peer in peerIds:
|
||||
unsubscribePeer(peer)
|
||||
|
||||
discard await allFutures(futs).withTimeout(5.seconds)
|
||||
|
||||
proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} =
|
||||
rdv.unsubscribeLocally(ns)
|
||||
|
||||
await rdv.unsubscribe(ns, rdv.peers)
|
||||
|
||||
proc setup*(rdv: RendezVous, switch: Switch) =
|
||||
rdv.switch = switch
|
||||
@@ -662,7 +684,25 @@ proc setup*(rdv: RendezVous, switch: Switch) =
|
||||
rdv.switch.addPeerEventHandler(handlePeer, Joined)
|
||||
rdv.switch.addPeerEventHandler(handlePeer, Left)
|
||||
|
||||
proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T =
|
||||
proc new*(
|
||||
T: typedesc[RendezVous],
|
||||
rng: ref HmacDrbgContext = newRng(),
|
||||
minDuration = MinimumDuration,
|
||||
maxDuration = MaximumDuration,
|
||||
): T {.raises: [RendezVousError].} =
|
||||
if minDuration < 1.minutes:
|
||||
raise newException(RendezVousError, "TTL too short: 1 minute minimum")
|
||||
|
||||
if maxDuration > 72.hours:
|
||||
raise newException(RendezVousError, "TTL too long: 72 hours maximum")
|
||||
|
||||
if minDuration >= maxDuration:
|
||||
raise newException(RendezVousError, "Minimum TTL longer than maximum")
|
||||
|
||||
let
|
||||
minTTL = minDuration.seconds.uint64
|
||||
maxTTL = maxDuration.seconds.uint64
|
||||
|
||||
let rdv = T(
|
||||
rng: rng,
|
||||
salt: string.fromBytes(generateBytes(rng[], 8)),
|
||||
@@ -670,6 +710,10 @@ proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T =
|
||||
defaultDT: Moment.now() - 1.days,
|
||||
#registerEvent: newAsyncEvent(),
|
||||
sema: newAsyncSemaphore(SemaphoreDefaultSize),
|
||||
minDuration: minDuration,
|
||||
maxDuration: maxDuration,
|
||||
minTTL: minTTL,
|
||||
maxTTL: maxTTL,
|
||||
)
|
||||
logScope:
|
||||
topics = "libp2p discovery rendezvous"
|
||||
@@ -701,9 +745,13 @@ proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T =
|
||||
return rdv
|
||||
|
||||
proc new*(
|
||||
T: typedesc[RendezVous], switch: Switch, rng: ref HmacDrbgContext = newRng()
|
||||
T: typedesc[RendezVous],
|
||||
switch: Switch,
|
||||
rng: ref HmacDrbgContext = newRng(),
|
||||
minDuration = MinimumDuration,
|
||||
maxDuration = MaximumDuration,
|
||||
): T =
|
||||
let rdv = T.new(rng)
|
||||
let rdv = T.new(rng, minDuration, maxDuration)
|
||||
rdv.setup(switch)
|
||||
return rdv
|
||||
|
||||
|
||||
@@ -81,7 +81,7 @@ type
|
||||
localPrivateKey: PrivateKey
|
||||
localPublicKey: seq[byte]
|
||||
noiseKeys: KeyPair
|
||||
commonPrologue: seq[byte]
|
||||
commonPrologue*: seq[byte]
|
||||
outgoing: bool
|
||||
|
||||
NoiseConnection* = ref object of SecureConn
|
||||
|
||||
559
libp2p/transports/webrtctransport.nim
Normal file
559
libp2p/transports/webrtctransport.nim
Normal file
@@ -0,0 +1,559 @@
|
||||
# Nim-LibP2P
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
## WebRtc transport implementation
|
||||
## For now, only support WebRtc direct (ie browser to server)
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[sequtils, strutils]
|
||||
import stew/[endians2, byteutils, objects, results]
|
||||
import chronos, chronicles
|
||||
import transport,
|
||||
../errors,
|
||||
../wire,
|
||||
../multicodec,
|
||||
../multihash,
|
||||
../multibase,
|
||||
../protobuf/minprotobuf,
|
||||
../connmanager,
|
||||
../muxers/muxer,
|
||||
../multiaddress,
|
||||
../stream/connection,
|
||||
../upgrademngrs/upgrade,
|
||||
../protocols/secure/noise,
|
||||
../utility
|
||||
|
||||
import webrtc/webrtc, webrtc/datachannel, webrtc/dtls/dtls_transport, webrtc/errors
|
||||
|
||||
logScope:
|
||||
topics = "libp2p webrtctransport"
|
||||
|
||||
export transport, results
|
||||
|
||||
const charset = toSeq("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789/+".items)
|
||||
proc genUfrag*(rng: ref HmacDrbgContext, size: int): seq[byte] =
|
||||
# https://github.com/libp2p/specs/blob/master/webrtc/webrtc-direct.md?plain=1#L73-L77
|
||||
result = newSeq[byte](size)
|
||||
for resultIndex in 0..<size:
|
||||
let charsetIndex = rng[].generate(uint) mod charset.len()
|
||||
result[resultIndex] = charset[charsetIndex].ord().uint8
|
||||
|
||||
const
|
||||
WebRtcTransportTrackerName* = "libp2p.webrtctransport"
|
||||
MaxMessageSize = 16384 # 16KiB; from the WebRtc-direct spec
|
||||
|
||||
# -- Message --
|
||||
# Implementation of the libp2p's WebRTC message defined here:
|
||||
# https://github.com/libp2p/specs/blob/master/webrtc/README.md?plain=1#L60-L79
|
||||
|
||||
type
|
||||
MessageFlag = enum
|
||||
## Flags to support half-closing and reset of streams.
|
||||
## - Fin: Sender will no longer send messages
|
||||
## - StopSending: Sender will no longer read messages.
|
||||
## Received messages are discarded
|
||||
## - ResetStream: Sender abruptly terminates the sending part of the stream.
|
||||
## Receiver MAY discard any data that it already received on that stream
|
||||
## - FinAck: Acknowledges the previous receipt of a message with the Fin flag set.
|
||||
Fin = 0
|
||||
StopSending = 1
|
||||
ResetStream = 2
|
||||
FinAck = 3
|
||||
|
||||
WebRtcMessage = object
|
||||
flag: Opt[MessageFlag]
|
||||
data: seq[byte]
|
||||
|
||||
proc decode(_: type WebRtcMessage, bytes: seq[byte]): Opt[WebRtcMessage] =
|
||||
## Decoding WebRTC Message from raw data
|
||||
var
|
||||
pb = initProtoBuffer(bytes)
|
||||
flagOrd: uint32
|
||||
res: WebRtcMessage
|
||||
if ? pb.getField(1, flagOrd).toOpt():
|
||||
var flag: MessageFlag
|
||||
if flag.checkedEnumAssign(flagOrd):
|
||||
res.flag = Opt.some(flag)
|
||||
|
||||
discard ? pb.getField(2, res.data).toOpt()
|
||||
Opt.some(res)
|
||||
|
||||
proc encode(msg: WebRtcMessage): seq[byte] =
|
||||
## Encoding WebRTC Message to raw data
|
||||
var pb = initProtoBuffer()
|
||||
|
||||
msg.flag.withValue(val):
|
||||
pb.write(1, uint32(val))
|
||||
|
||||
if msg.data.len > 0:
|
||||
pb.write(2, msg.data)
|
||||
|
||||
pb.finish()
|
||||
pb.buffer
|
||||
|
||||
# -- Raw WebRTC Stream --
|
||||
# All the data written to or read from a WebRtcStream should be length-prefixed
|
||||
# so `readOnce`/`write` WebRtcStream implementation must either recode
|
||||
# `readLP`/`writeLP`, or implement a `RawWebRtcStream` on which we can
|
||||
# directly use `readLP` and `writeLP`. The second solution is the less redundant,
|
||||
# so it's the one we've chosen.
|
||||
|
||||
type
|
||||
RawWebRtcStream = ref object of Connection
|
||||
dataChannel: DataChannelStream
|
||||
readData: seq[byte]
|
||||
|
||||
proc new(_: type RawWebRtcStream, dataChannel: DataChannelStream): RawWebRtcStream =
|
||||
RawWebRtcStream(dataChannel: dataChannel)
|
||||
|
||||
method closeImpl*(s: RawWebRtcStream): Future[void] {.async: (raises: []).} =
|
||||
# TODO: close datachannel
|
||||
discard
|
||||
|
||||
method write*(s: RawWebRtcStream, msg: seq[byte]): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
trace "RawWebrtcStream write", msg, len=msg.len()
|
||||
try:
|
||||
await s.dataChannel.write(msg)
|
||||
except WebRtcError as exc:
|
||||
raise newException(LPStreamError, exc.msg, exc)
|
||||
|
||||
method readOnce*(s: RawWebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
# TODO:
|
||||
# if s.isClosed:
|
||||
# raise newLPStreamEOFError()
|
||||
|
||||
if s.readData.len() == 0:
|
||||
try:
|
||||
let rawData = await s.dataChannel.read()
|
||||
s.readData = rawData
|
||||
except WebRtcError as exc:
|
||||
raise newException(LPStreamError, exc.msg, exc)
|
||||
trace "readOnce RawWebRtcStream", data = s.readData, nbytes
|
||||
|
||||
result = min(nbytes, s.readData.len)
|
||||
copyMem(pbytes, addr s.readData[0], result)
|
||||
s.readData = s.readData[result..^1]
|
||||
|
||||
# -- Stream --
|
||||
|
||||
type
|
||||
WebRtcState = enum
|
||||
Sending, Closing, Closed
|
||||
|
||||
WebRtcStream = ref object of Connection
|
||||
rawStream: RawWebRtcStream
|
||||
sendQueue: seq[(seq[byte], Future[void].Raising([CancelledError, LPStreamError]))]
|
||||
sendLoop: Future[void]
|
||||
readData: seq[byte]
|
||||
txState: WebRtcState # Transmission
|
||||
rxState: WebRtcState # Reception
|
||||
|
||||
proc new(
|
||||
_: type WebRtcStream,
|
||||
dataChannel: DataChannelStream,
|
||||
oaddr: Opt[MultiAddress],
|
||||
peerId: PeerId): WebRtcStream =
|
||||
let stream = WebRtcStream(rawStream: RawWebRtcStream.new(dataChannel),
|
||||
observedAddr: oaddr, peerId: peerId)
|
||||
procCall Connection(stream).initStream()
|
||||
stream
|
||||
|
||||
proc sender(s: WebRtcStream) {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
while s.sendQueue.len > 0:
|
||||
let (message, fut) = s.sendQueue.pop()
|
||||
#TODO handle exceptions
|
||||
await s.rawStream.writeLp(message)
|
||||
if not fut.isNil: fut.complete()
|
||||
|
||||
proc send(s: WebRtcStream, msg: WebRtcMessage, fut: Future[void].Raising([CancelledError, LPStreamError]) = nil) =
|
||||
let wrappedMessage = msg.encode()
|
||||
s.sendQueue.insert((wrappedMessage, fut))
|
||||
|
||||
if s.sendLoop == nil or s.sendLoop.finished:
|
||||
s.sendLoop = s.sender()
|
||||
|
||||
method write*(s: WebRtcStream, msg2: seq[byte]): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
# We need to make sure we send all of our data before another write
|
||||
# Otherwise, two concurrent writes could get intertwined
|
||||
# We avoid this by filling the s.sendQueue synchronously
|
||||
var msg = msg2
|
||||
trace "WebrtcStream write", msg, len=msg.len()
|
||||
var retFuture = Future[void].Raising([CancelledError, LPStreamError]).init("WebRtcStream.write")
|
||||
if s.txState != Sending:
|
||||
raise newException(LPStreamClosedError, "whatever")
|
||||
|
||||
var messages: seq[seq[byte]]
|
||||
while msg.len > MaxMessageSize - 16:
|
||||
let
|
||||
endOfMessage = MaxMessageSize - 16
|
||||
wrappedMessage = WebRtcMessage(data: msg[0 ..< endOfMessage])
|
||||
s.send(wrappedMessage)
|
||||
msg = msg[endOfMessage .. ^1]
|
||||
|
||||
let
|
||||
wrappedMessage = WebRtcMessage(data: msg)
|
||||
s.send(wrappedMessage, retFuture)
|
||||
|
||||
await retFuture
|
||||
|
||||
proc actuallyClose(s: WebRtcStream) {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
debug "stream closed", rxState=s.rxState, txState=s.txState
|
||||
if s.rxState == Closed and s.txState == Closed and s.readData.len == 0:
|
||||
#TODO add support to DataChannel
|
||||
#await s.dataChannel.close()
|
||||
await procCall Connection(s).closeImpl()
|
||||
|
||||
method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
if s.rxState == Closed:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
while s.readData.len == 0 or nbytes == 0:
|
||||
# Check if there's no data left in readData or if nbytes is equal to 0
|
||||
# in order to read an eventual Fin or FinAck
|
||||
if s.rxState == Closed:
|
||||
await s.actuallyClose()
|
||||
return 0
|
||||
|
||||
try:
|
||||
let
|
||||
#TODO handle exceptions
|
||||
message = await s.rawStream.readLp(MaxMessageSize)
|
||||
decoded = WebRtcMessage.decode(message).tryGet()
|
||||
|
||||
s.readData = s.readData.concat(decoded.data)
|
||||
|
||||
decoded.flag.withValue(flag):
|
||||
case flag:
|
||||
of Fin:
|
||||
# Peer won't send any more data
|
||||
s.rxState = Closed
|
||||
s.send(WebRtcMessage(flag: Opt.some(FinAck)))
|
||||
of FinAck:
|
||||
s.txState = Closed
|
||||
await s.actuallyClose()
|
||||
if nbytes == 0:
|
||||
return 0
|
||||
else: discard
|
||||
except CatchableError as exc:
|
||||
raise newException(LPStreamError, exc.msg, exc)
|
||||
|
||||
|
||||
result = min(nbytes, s.readData.len)
|
||||
copyMem(pbytes, addr s.readData[0], result)
|
||||
s.readData = s.readData[result..^1]
|
||||
|
||||
method closeImpl*(s: WebRtcStream) {.async: (raises: []).} =
|
||||
s.send(WebRtcMessage(flag: Opt.some(Fin)))
|
||||
s.txState = Closing
|
||||
while s.txState != Closed:
|
||||
try:
|
||||
discard await s.readOnce(nil, 0)
|
||||
except CatchableError as exc:
|
||||
discard
|
||||
except CancelledError as exc:
|
||||
discard
|
||||
|
||||
# -- Connection --
|
||||
|
||||
type WebRtcConnection = ref object of Connection
|
||||
connection: DataChannelConnection
|
||||
remoteAddress: MultiAddress
|
||||
|
||||
method close*(conn: WebRtcConnection) {.async: (raises: []).} =
|
||||
#TODO
|
||||
discard
|
||||
|
||||
proc new(
|
||||
_: type WebRtcConnection,
|
||||
conn: DataChannelConnection,
|
||||
observedAddr: Opt[MultiAddress]
|
||||
): WebRtcConnection =
|
||||
let co = WebRtcConnection(connection: conn, observedAddr: observedAddr)
|
||||
procCall Connection(co).initStream()
|
||||
co
|
||||
|
||||
proc getStream*(conn: WebRtcConnection,
|
||||
direction: Direction,
|
||||
noiseHandshake: bool = false): Future[WebRtcStream] {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
var datachannel =
|
||||
case direction:
|
||||
of Direction.In:
|
||||
await conn.connection.accept()
|
||||
of Direction.Out:
|
||||
try:
|
||||
await conn.connection.openStream(noiseHandshake)
|
||||
except WebRtcError as exc:
|
||||
raise newException(LPStreamError, exc.msg, exc)
|
||||
return WebRtcStream.new(datachannel, conn.observedAddr, conn.peerId)
|
||||
|
||||
# -- Muxer --
|
||||
|
||||
type WebRtcMuxer = ref object of Muxer
|
||||
webRtcConn: WebRtcConnection
|
||||
handleFut: Future[void]
|
||||
|
||||
method newStream*(m: WebRtcMuxer, name: string = "", lazy: bool = false): Future[Connection] {.async: (raises: [CancelledError, LPStreamError, MuxerError]).} =
|
||||
return await m.webRtcConn.getStream(Direction.Out)
|
||||
|
||||
proc handleStream(m: WebRtcMuxer, chann: WebRtcStream) {.async.} =
|
||||
try:
|
||||
await m.streamHandler(chann)
|
||||
trace "finished handling stream"
|
||||
doAssert(chann.closed, "connection not closed by handler!")
|
||||
except CatchableError as exc:
|
||||
trace "Exception in mplex stream handler", msg = exc.msg
|
||||
await chann.close()
|
||||
|
||||
#TODO add atEof
|
||||
|
||||
method handle*(m: WebRtcMuxer): Future[void] {.async: (raises: []).} =
|
||||
try:
|
||||
#while not m.webRtcConn.atEof:
|
||||
while true:
|
||||
let incomingStream = await m.webRtcConn.getStream(Direction.In)
|
||||
asyncSpawn m.handleStream(incomingStream)
|
||||
except CatchableError as exc:
|
||||
discard
|
||||
except CancelledError as exc:
|
||||
discard
|
||||
finally:
|
||||
await m.webRtcConn.close()
|
||||
|
||||
method close*(m: WebRtcMuxer) {.async: (raises: []).} =
|
||||
m.handleFut.cancel()
|
||||
await m.webRtcConn.close()
|
||||
|
||||
# -- Upgrader --
|
||||
|
||||
type
|
||||
WebRtcStreamHandler = proc(conn: Connection): Future[void] {.async: (raises: []).}
|
||||
WebRtcUpgrade = ref object of Upgrade
|
||||
streamHandler: WebRtcStreamHandler
|
||||
|
||||
method upgrade*(
|
||||
self: WebRtcUpgrade,
|
||||
conn: Connection,
|
||||
peerId: Opt[PeerId]): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} =
|
||||
|
||||
let webRtcConn = WebRtcConnection(conn)
|
||||
result = WebRtcMuxer(connection: conn, webRtcConn: webRtcConn)
|
||||
|
||||
# Noise handshake
|
||||
let noiseHandler = self.secureManagers.filterIt(it of Noise)
|
||||
assert noiseHandler.len > 0
|
||||
|
||||
let xx = "libp2p-webrtc-noise:".toBytes()
|
||||
let localCert = MultiHash.digest("sha2-256", webRtcConn.connection.localCertificate()).get().data.buffer
|
||||
let remoteCert = MultiHash.digest("sha2-256", webRtcConn.connection.remoteCertificate()).get().data.buffer
|
||||
((Noise)noiseHandler[0]).commonPrologue = xx & remoteCert & localCert
|
||||
echo "=> ", ((Noise)noiseHandler[0]).commonPrologue
|
||||
|
||||
let
|
||||
stream = await webRtcConn.getStream(Out, true)
|
||||
secureStream = await noiseHandler[0].handshake(
|
||||
stream,
|
||||
initiator = true, # we are always the initiator in webrtc-direct
|
||||
peerId = peerId
|
||||
)
|
||||
|
||||
# Peer proved its identity, we can close this
|
||||
await secureStream.close()
|
||||
await stream.close()
|
||||
|
||||
result.streamHandler = self.streamHandler
|
||||
result.handler = result.handle()
|
||||
|
||||
# -- Transport --
|
||||
|
||||
type
|
||||
WebRtcTransport* = ref object of Transport
|
||||
connectionsTimeout: Duration
|
||||
servers: seq[WebRtc]
|
||||
acceptFuts: seq[Future[DataChannelConnection].Raising([CancelledError, WebRtcError])]
|
||||
clients: array[Direction, seq[DataChannelConnection]]
|
||||
|
||||
WebRtcTransportTracker* = ref object of TrackerBase
|
||||
opened*: uint64
|
||||
closed*: uint64
|
||||
|
||||
WebRtcTransportError* = object of transport.TransportError
|
||||
|
||||
proc setupWebRtcTransportTracker(): WebRtcTransportTracker {.gcsafe, raises: [].}
|
||||
|
||||
proc getWebRtcTransportTracker(): WebRtcTransportTracker {.gcsafe.} =
|
||||
result = cast[WebRtcTransportTracker](getTracker(WebRtcTransportTrackerName))
|
||||
if isNil(result):
|
||||
result = setupWebRtcTransportTracker()
|
||||
|
||||
proc dumpTracking(): string {.gcsafe.} =
|
||||
var tracker = getWebRtcTransportTracker()
|
||||
result = "Opened tcp transports: " & $tracker.opened & "\n" &
|
||||
"Closed tcp transports: " & $tracker.closed
|
||||
|
||||
proc leakTransport(): bool {.gcsafe.} =
|
||||
var tracker = getWebRtcTransportTracker()
|
||||
result = (tracker.opened != tracker.closed)
|
||||
|
||||
proc setupWebRtcTransportTracker(): WebRtcTransportTracker =
|
||||
result = new WebRtcTransportTracker
|
||||
result.opened = 0
|
||||
result.closed = 0
|
||||
result.dump = dumpTracking
|
||||
result.isLeaked = leakTransport
|
||||
addTracker(WebRtcTransportTrackerName, result)
|
||||
|
||||
proc new*(
|
||||
T: typedesc[WebRtcTransport],
|
||||
upgrade: Upgrade,
|
||||
connectionsTimeout = 10.minutes): T {.public.} =
|
||||
|
||||
let upgrader = WebRtcUpgrade(ms: upgrade.ms, secureManagers: upgrade.secureManagers)
|
||||
upgrader.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
|
||||
# TODO: replace echo by trace and find why it fails compiling
|
||||
echo "Starting stream handler"#, conn
|
||||
try:
|
||||
await upgrader.ms.handle(conn) # handle incoming connection
|
||||
except CancelledError as exc:
|
||||
echo "Stream handler cancelled"
|
||||
except CatchableError as exc:
|
||||
echo "Exception in stream handler", exc.msg
|
||||
finally:
|
||||
await conn.closeWithEOF()
|
||||
echo "Stream handler done"#, conn
|
||||
|
||||
let
|
||||
transport = T(
|
||||
upgrader: upgrader,
|
||||
connectionsTimeout: connectionsTimeout)
|
||||
|
||||
return transport
|
||||
|
||||
proc stunpwprovider(usernameBytes: seq[byte]): seq[byte] =
|
||||
let username = string.fromBytes(usernameBytes)
|
||||
let usersplit = username.split(":")
|
||||
if usersplit.len() <= 2 and usersplit[0].startsWith("libp2p+webrtc+v1/"):
|
||||
return toBytes(usersplit[0])
|
||||
else:
|
||||
return @[]
|
||||
|
||||
method start*(
|
||||
self: WebRtcTransport,
|
||||
addrs: seq[MultiAddress]) {.async.} =
|
||||
## listen on the transport
|
||||
##
|
||||
|
||||
if self.running:
|
||||
warn "WebRtc transport already running"
|
||||
return
|
||||
|
||||
await procCall Transport(self).start(addrs)
|
||||
trace "Starting WebRtc transport"
|
||||
inc getWebRtcTransportTracker().opened
|
||||
|
||||
for i, ma in addrs:
|
||||
if not self.handles(ma):
|
||||
trace "Invalid address detected, skipping!", address = ma
|
||||
continue
|
||||
|
||||
let
|
||||
transportAddress = initTAddress(ma[0..1].tryGet()).tryGet()
|
||||
server = WebRtc.new(transportAddress, passwordProvider = stunpwprovider)
|
||||
server.listen()
|
||||
|
||||
self.servers &= server
|
||||
|
||||
let
|
||||
cert = server.localCertificate()
|
||||
certHash = MultiHash.digest("sha2-256", cert).get().data.buffer
|
||||
encodedCertHash = MultiBase.encode("base64", certHash).get()
|
||||
self.addrs[i] = MultiAddress.init(server.localAddress(), IPPROTO_UDP).tryGet() &
|
||||
MultiAddress.init(multiCodec("webrtc-direct")).tryGet() &
|
||||
MultiAddress.init(multiCodec("certhash"), certHash).tryGet()
|
||||
|
||||
trace "Listening on", address = self.addrs[i]
|
||||
|
||||
proc connHandler(
|
||||
self: WebRtcTransport,
|
||||
client: DataChannelConnection,
|
||||
observedAddr: Opt[MultiAddress],
|
||||
dir: Direction
|
||||
): Future[Connection] {.async.} =
|
||||
trace "Handling webrtc connection", address = $observedAddr, dir = $dir,
|
||||
clients = self.clients[Direction.In].len + self.clients[Direction.Out].len
|
||||
|
||||
let conn: Connection =
|
||||
WebRtcConnection.new(
|
||||
conn = client,
|
||||
# dir = dir,
|
||||
observedAddr = observedAddr
|
||||
# timeout = self.connectionsTimeout
|
||||
)
|
||||
|
||||
proc onClose() {.async.} =
|
||||
try:
|
||||
let futs = @[conn.join(), conn.join()] #TODO that's stupid
|
||||
await futs[0] or futs[1]
|
||||
for f in futs:
|
||||
if not f.finished: await f.cancelAndWait() # cancel outstanding join()
|
||||
|
||||
trace "Cleaning up client"# TODO ?: , addrs = $client.remoteAddress,
|
||||
# conn
|
||||
|
||||
self.clients[dir].keepItIf( it != client )
|
||||
#TODO
|
||||
#await allFuturesThrowing(
|
||||
# conn.close(), client.closeWait())
|
||||
|
||||
except CatchableError as exc:
|
||||
let useExc {.used.} = exc
|
||||
debug "Error cleaning up client", errMsg = exc.msg, conn
|
||||
|
||||
self.clients[dir].add(client)
|
||||
asyncSpawn onClose()
|
||||
|
||||
return conn
|
||||
|
||||
method accept*(self: WebRtcTransport): Future[Connection] {.async, gcsafe.} =
|
||||
if not self.running:
|
||||
raise newTransportClosedError()
|
||||
|
||||
#TODO handle errors
|
||||
if self.acceptFuts.len <= 0:
|
||||
self.acceptFuts = self.servers.mapIt(it.accept())
|
||||
|
||||
if self.acceptFuts.len <= 0:
|
||||
return
|
||||
|
||||
let
|
||||
finished = await one(self.acceptFuts)
|
||||
index = self.acceptFuts.find(finished)
|
||||
|
||||
self.acceptFuts[index] = self.servers[index].accept()
|
||||
trace "Accept WebRTC Transport"
|
||||
|
||||
let transp = await finished
|
||||
try:
|
||||
#TODO add remoteAddress to DataChannelConnection
|
||||
#let observedAddr = MultiAddress.init(transp.remoteAddress).tryGet() #TODO add /webrtc-direct
|
||||
let observedAddr = MultiAddress.init("/ip4/127.0.0.1").tryGet()
|
||||
return await self.connHandler(transp, Opt.some(observedAddr), Direction.In)
|
||||
except CancelledError as exc:
|
||||
#TODO
|
||||
#transp.close()
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "Failed to handle connection", exc = exc.msg
|
||||
#TODO
|
||||
#transp.close()
|
||||
|
||||
method handles*(t: WebRtcTransport, address: MultiAddress): bool {.gcsafe.} =
|
||||
if procCall Transport(t).handles(address):
|
||||
if address.protocols.isOk:
|
||||
return WebRTCDirect2.match(address)
|
||||
@@ -126,7 +126,7 @@ suite "RendezVous":
|
||||
|
||||
asyncTest "Various local error":
|
||||
let
|
||||
rdv = RendezVous.new()
|
||||
rdv = RendezVous.new(minDuration = 1.minutes, maxDuration = 72.hours)
|
||||
switch = createSwitch(rdv)
|
||||
expect RendezVousError:
|
||||
discard await rdv.request("A".repeat(300))
|
||||
@@ -137,6 +137,14 @@ suite "RendezVous":
|
||||
expect RendezVousError:
|
||||
await rdv.advertise("A".repeat(300))
|
||||
expect RendezVousError:
|
||||
await rdv.advertise("A", 2.weeks)
|
||||
await rdv.advertise("A", 73.hours)
|
||||
expect RendezVousError:
|
||||
await rdv.advertise("A", 5.minutes)
|
||||
await rdv.advertise("A", 30.seconds)
|
||||
|
||||
test "Various config error":
|
||||
expect RendezVousError:
|
||||
discard RendezVous.new(minDuration = 30.seconds)
|
||||
expect RendezVousError:
|
||||
discard RendezVous.new(maxDuration = 73.hours)
|
||||
expect RendezVousError:
|
||||
discard RendezVous.new(minDuration = 15.minutes, maxDuration = 10.minutes)
|
||||
|
||||
44
testwebrtc.nim
Normal file
44
testwebrtc.nim
Normal file
@@ -0,0 +1,44 @@
|
||||
import chronos, libp2p, libp2p/transports/webrtctransport
|
||||
import stew/byteutils
|
||||
|
||||
proc echoHandler(conn: Connection, proto: string) {.async.} =
|
||||
defer: await conn.close()
|
||||
while true:
|
||||
try:
|
||||
echo "\e[35;1m => Echo Handler <=\e[0m"
|
||||
var xx = newSeq[byte](1024)
|
||||
let aa = await conn.readOnce(addr xx[0], 1024)
|
||||
xx = xx[0..<aa]
|
||||
let msg = string.fromBytes(xx)
|
||||
echo " => Echo Handler Receive: ", msg, " <="
|
||||
echo " => Echo Handler Try Send: ", msg & "1", " <="
|
||||
await conn.write(msg & "1")
|
||||
except CatchableError as e:
|
||||
echo " => Echo Handler Error: ", e.msg, " <="
|
||||
break
|
||||
|
||||
proc main {.async.} =
|
||||
let ma = MultiAddress.init("/ip4/127.0.0.1/udp/4242/webrtc-direct/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g")
|
||||
echo ma
|
||||
let switch =
|
||||
SwitchBuilder.new()
|
||||
.withAddress(ma.tryGet()) #TODO the certhash shouldn't be necessary
|
||||
.withRng(crypto.newRng())
|
||||
.withMplex()
|
||||
.withYamux()
|
||||
.withTransport(proc (upgr: Upgrade): Transport = WebRtcTransport.new(upgr))
|
||||
.withNoise()
|
||||
.build()
|
||||
|
||||
let
|
||||
codec = "/echo/1.0.0"
|
||||
proto = new LPProtocol
|
||||
proto.handler = echoHandler
|
||||
proto.codec = codec
|
||||
|
||||
switch.mount(proto)
|
||||
await switch.start()
|
||||
echo "\e[31;1m", $(switch.peerInfo.addrs[0]), "/p2p/", $(switch.peerInfo.peerId), "\e[0m"
|
||||
await sleepAsync(1.hours)
|
||||
|
||||
waitFor main()
|
||||
Reference in New Issue
Block a user