mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-09 02:38:19 -05:00
feat: quic (#1265)
Co-authored-by: vladopajic <vladopajic@users.noreply.github.com>
This commit is contained in:
4
.pinned
4
.pinned
@@ -6,9 +6,9 @@ faststreams;https://github.com/status-im/nim-faststreams@#720fc5e5c8e428d9d0af61
|
||||
httputils;https://github.com/status-im/nim-http-utils@#3b491a40c60aad9e8d3407443f46f62511e63b18
|
||||
json_serialization;https://github.com/status-im/nim-json-serialization@#85b7ea093cb85ee4f433a617b97571bd709d30df
|
||||
metrics;https://github.com/status-im/nim-metrics@#6142e433fc8ea9b73379770a788017ac528d46ff
|
||||
ngtcp2;https://github.com/status-im/nim-ngtcp2@#6834f4756b6af58356ac9c4fef3d71db3c3ae5fe
|
||||
ngtcp2;https://github.com/status-im/nim-ngtcp2@#9456daa178c655bccd4a3c78ad3b8cce1f0add73
|
||||
nimcrypto;https://github.com/cheatfate/nimcrypto@#1c8d6e3caf3abc572136ae9a1da81730c4eb4288
|
||||
quic;https://github.com/status-im/nim-quic.git@#ddcb31ffb74b5460ab37fd13547eca90594248bc
|
||||
quic;https://github.com/status-im/nim-quic.git@#51f20d2cbd79d02ec25ff29d87ee192d2b4cc2af
|
||||
results;https://github.com/arnetheduck/nim-results@#f3c666a272c69d70cb41e7245e7f6844797303ad
|
||||
secp256k1;https://github.com/status-im/nim-secp256k1@#7246d91c667f4cc3759fdd50339caa45a2ecd8be
|
||||
serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe54049950e352bb969aab97173b35
|
||||
|
||||
@@ -11,7 +11,7 @@ requires "nim >= 1.6.0",
|
||||
"nimcrypto >= 0.6.0 & < 0.7.0", "dnsclient >= 0.3.0 & < 0.4.0", "bearssl >= 0.2.5",
|
||||
"chronicles >= 0.10.2", "chronos >= 4.0.3", "metrics", "secp256k1", "stew#head",
|
||||
"websock", "unittest2",
|
||||
"https://github.com/status-im/nim-quic.git#ddcb31ffb74b5460ab37fd13547eca90594248bc"
|
||||
"https://github.com/status-im/nim-quic.git#51f20d2cbd79d02ec25ff29d87ee192d2b4cc2af"
|
||||
|
||||
let nimc = getEnv("NIMC", "nim") # Which nim compiler to use
|
||||
let lang = getEnv("NIMLANG", "c") # Which backend (c/cpp/js)
|
||||
|
||||
@@ -38,7 +38,8 @@ import services/wildcardresolverservice
|
||||
export switch, peerid, peerinfo, connection, multiaddress, crypto, errors
|
||||
|
||||
type
|
||||
TransportProvider* {.public.} = proc(upgr: Upgrade): Transport {.gcsafe, raises: [].}
|
||||
TransportProvider* {.public.} =
|
||||
proc(upgr: Upgrade, privateKey: PrivateKey): Transport {.gcsafe, raises: [].}
|
||||
|
||||
SecureProtocol* {.pure.} = enum
|
||||
Noise
|
||||
@@ -151,7 +152,7 @@ proc withTransport*(
|
||||
let switch = SwitchBuilder
|
||||
.new()
|
||||
.withTransport(
|
||||
proc(upgr: Upgrade): Transport =
|
||||
proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
|
||||
TcpTransport.new(flags, upgr)
|
||||
)
|
||||
.build()
|
||||
@@ -162,7 +163,7 @@ proc withTcpTransport*(
|
||||
b: SwitchBuilder, flags: set[ServerFlags] = {}
|
||||
): SwitchBuilder {.public.} =
|
||||
b.withTransport(
|
||||
proc(upgr: Upgrade): Transport =
|
||||
proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
|
||||
TcpTransport.new(flags, upgr)
|
||||
)
|
||||
|
||||
@@ -270,7 +271,7 @@ proc build*(b: SwitchBuilder): Switch {.raises: [LPError], public.} =
|
||||
let transports = block:
|
||||
var transports: seq[Transport]
|
||||
for tProvider in b.transports:
|
||||
transports.add(tProvider(muxedUpgrade))
|
||||
transports.add(tProvider(muxedUpgrade, seckey))
|
||||
transports
|
||||
|
||||
if b.secureManagers.len == 0:
|
||||
|
||||
@@ -31,14 +31,14 @@ method connect*(
|
||||
## a protocol
|
||||
##
|
||||
|
||||
doAssert(false, "Not implemented!")
|
||||
doAssert(false, "[Dial.connect] abstract method not implemented!")
|
||||
|
||||
method connect*(
|
||||
self: Dial, address: MultiAddress, allowUnknownPeerId = false
|
||||
): Future[PeerId] {.base, async: (raises: [DialFailedError, CancelledError]).} =
|
||||
## Connects to a peer and retrieve its PeerId
|
||||
|
||||
doAssert(false, "Not implemented!")
|
||||
doAssert(false, "[Dial.connect] abstract method not implemented!")
|
||||
|
||||
method dial*(
|
||||
self: Dial, peerId: PeerId, protos: seq[string]
|
||||
@@ -47,7 +47,7 @@ method dial*(
|
||||
## existing connection
|
||||
##
|
||||
|
||||
doAssert(false, "Not implemented!")
|
||||
doAssert(false, "[Dial.dial] abstract method not implemented!")
|
||||
|
||||
method dial*(
|
||||
self: Dial,
|
||||
@@ -60,14 +60,14 @@ method dial*(
|
||||
## a connection if one doesn't exist already
|
||||
##
|
||||
|
||||
doAssert(false, "Not implemented!")
|
||||
doAssert(false, "[Dial.dial] abstract method not implemented!")
|
||||
|
||||
method addTransport*(self: Dial, transport: Transport) {.base.} =
|
||||
doAssert(false, "Not implemented!")
|
||||
doAssert(false, "[Dial.addTransport] abstract method not implemented!")
|
||||
|
||||
method tryDial*(
|
||||
self: Dial, peerId: PeerId, addrs: seq[MultiAddress]
|
||||
): Future[Opt[MultiAddress]] {.
|
||||
base, async: (raises: [DialFailedError, CancelledError])
|
||||
.} =
|
||||
doAssert(false, "Not implemented!")
|
||||
doAssert(false, "[Dial.tryDial] abstract method not implemented!")
|
||||
|
||||
@@ -59,7 +59,7 @@ proc `{}`*[T](pa: PeerAttributes, t: typedesc[T]): Opt[T] =
|
||||
|
||||
proc `[]`*[T](pa: PeerAttributes, t: typedesc[T]): T {.raises: [KeyError].} =
|
||||
pa{T}.valueOr:
|
||||
raise newException(KeyError, "Attritute not found")
|
||||
raise newException(KeyError, "Attribute not found")
|
||||
|
||||
proc match*(pa, candidate: PeerAttributes): bool =
|
||||
for f in pa.attributes:
|
||||
@@ -86,12 +86,12 @@ type
|
||||
method request*(
|
||||
self: DiscoveryInterface, pa: PeerAttributes
|
||||
) {.base, async: (raises: [DiscoveryError, CancelledError]).} =
|
||||
doAssert(false, "Not implemented!")
|
||||
doAssert(false, "[DiscoveryInterface.request] abstract method not implemented!")
|
||||
|
||||
method advertise*(
|
||||
self: DiscoveryInterface
|
||||
) {.base, async: (raises: [CancelledError, AdvertiseError]).} =
|
||||
doAssert(false, "Not implemented!")
|
||||
doAssert(false, "[DiscoveryInterface.advertise] abstract method not implemented!")
|
||||
|
||||
type
|
||||
DiscoveryQuery* = ref object
|
||||
|
||||
@@ -52,7 +52,7 @@ method newStream*(
|
||||
): Future[Connection] {.
|
||||
base, async: (raises: [CancelledError, LPStreamError, MuxerError], raw: true)
|
||||
.} =
|
||||
raiseAssert("Not implemented!")
|
||||
raiseAssert("[Muxer.newStream] abstract method not implemented!")
|
||||
|
||||
method close*(m: Muxer) {.base, async: (raises: []).} =
|
||||
if m.connection != nil:
|
||||
@@ -68,4 +68,4 @@ proc new*(
|
||||
muxerProvider
|
||||
|
||||
method getStreams*(m: Muxer): seq[Connection] {.base, gcsafe.} =
|
||||
raiseAssert("Not implemented!")
|
||||
raiseAssert("[Muxer.getStreams] abstract method not implemented!")
|
||||
|
||||
@@ -22,7 +22,7 @@ method resolveTxt*(
|
||||
self: NameResolver, address: string
|
||||
): Future[seq[string]] {.async: (raises: [CancelledError]), base.} =
|
||||
## Get TXT record
|
||||
raiseAssert "Not implemented!"
|
||||
raiseAssert "[NameResolver.resolveTxt] abstract method not implemented!"
|
||||
|
||||
method resolveIp*(
|
||||
self: NameResolver, address: string, port: Port, domain: Domain = Domain.AF_UNSPEC
|
||||
@@ -30,7 +30,7 @@ method resolveIp*(
|
||||
async: (raises: [CancelledError, TransportAddressError]), base
|
||||
.} =
|
||||
## Resolve the specified address
|
||||
raiseAssert "Not implemented!"
|
||||
raiseAssert "[NameResolver.resolveIp] abstract method not implemented!"
|
||||
|
||||
proc getHostname*(ma: MultiAddress): string =
|
||||
let
|
||||
|
||||
@@ -82,7 +82,7 @@ method readMessage*(
|
||||
): Future[seq[byte]] {.
|
||||
async: (raises: [CancelledError, LPStreamError], raw: true), base
|
||||
.} =
|
||||
raiseAssert("Not implemented!")
|
||||
raiseAssert("[SecureConn.readMessage] abstract method not implemented!")
|
||||
|
||||
method getWrapped*(s: SecureConn): Connection =
|
||||
s.stream
|
||||
@@ -92,7 +92,7 @@ method handshake*(
|
||||
): Future[SecureConn] {.
|
||||
async: (raises: [CancelledError, LPStreamError], raw: true), base
|
||||
.} =
|
||||
raiseAssert("Not implemented!")
|
||||
raiseAssert("[Secure.handshake] abstract method not implemented!")
|
||||
|
||||
proc handleConn(
|
||||
s: Secure, conn: Connection, initiator: bool, peerId: Opt[PeerId]
|
||||
|
||||
@@ -124,7 +124,7 @@ proc timeoutMonitor(s: Connection) {.async: (raises: []).} =
|
||||
return
|
||||
|
||||
method getWrapped*(s: Connection): Connection {.base.} =
|
||||
raiseAssert("Not implemented!")
|
||||
raiseAssert("[Connection.getWrapped] abstract method not implemented!")
|
||||
|
||||
when defined(libp2p_agents_metrics):
|
||||
proc setShortAgent*(s: Connection, shortAgent: string) =
|
||||
|
||||
@@ -133,7 +133,7 @@ method readOnce*(
|
||||
## Reads whatever is available in the stream,
|
||||
## up to `nbytes`. Will block if nothing is
|
||||
## available
|
||||
raiseAssert("Not implemented!")
|
||||
raiseAssert("[LPStream.readOnce] abstract method not implemented!")
|
||||
|
||||
proc readExactly*(
|
||||
s: LPStream, pbytes: pointer, nbytes: int
|
||||
@@ -242,7 +242,7 @@ method write*(
|
||||
async: (raises: [CancelledError, LPStreamError], raw: true), base, public
|
||||
.} =
|
||||
# Write `msg` to stream, waiting for the write to be finished
|
||||
raiseAssert("Not implemented!")
|
||||
raiseAssert("[LPStream.write] abstract method not implemented!")
|
||||
|
||||
proc writeLp*(
|
||||
s: LPStream, msg: openArray[byte]
|
||||
|
||||
@@ -77,7 +77,7 @@ method setup*(
|
||||
return true
|
||||
|
||||
method run*(self: Service, switch: Switch) {.base, async: (raises: [CancelledError]).} =
|
||||
doAssert(false, "Not implemented!")
|
||||
doAssert(false, "[Service.run] abstract method not implemented!")
|
||||
|
||||
method stop*(
|
||||
self: Service, switch: Switch
|
||||
|
||||
@@ -2,6 +2,7 @@ import std/sequtils
|
||||
import pkg/chronos
|
||||
import pkg/chronicles
|
||||
import pkg/quic
|
||||
import results
|
||||
import ../multiaddress
|
||||
import ../multicodec
|
||||
import ../stream/connection
|
||||
@@ -9,6 +10,7 @@ import ../wire
|
||||
import ../muxers/muxer
|
||||
import ../upgrademngrs/upgrade
|
||||
import ./transport
|
||||
import tls/certificate
|
||||
|
||||
export multiaddress
|
||||
export multicodec
|
||||
@@ -24,6 +26,8 @@ type
|
||||
QuicTransportError* = object of transport.TransportError
|
||||
QuicTransportDialError* = object of transport.TransportDialError
|
||||
|
||||
const alpn = "libp2p"
|
||||
|
||||
# Stream
|
||||
type QuicStream* = ref object of P2PConnection
|
||||
stream: Stream
|
||||
@@ -81,15 +85,19 @@ method close*(session: QuicSession) {.async: (raises: []).} =
|
||||
|
||||
proc getStream*(
|
||||
session: QuicSession, direction = Direction.In
|
||||
): Future[QuicStream] {.async: (raises: [CatchableError]).} =
|
||||
var stream: Stream
|
||||
case direction
|
||||
of Direction.In:
|
||||
stream = await session.connection.incomingStream()
|
||||
of Direction.Out:
|
||||
stream = await session.connection.openStream()
|
||||
await stream.write(@[]) # QUIC streams do not exist until data is sent
|
||||
return QuicStream.new(stream, session.observedAddr, session.peerId)
|
||||
): Future[QuicStream] {.async: (raises: [QuicTransportError]).} =
|
||||
try:
|
||||
var stream: Stream
|
||||
case direction
|
||||
of Direction.In:
|
||||
stream = await session.connection.incomingStream()
|
||||
of Direction.Out:
|
||||
stream = await session.connection.openStream()
|
||||
await stream.write(@[]) # QUIC streams do not exist until data is sent
|
||||
return QuicStream.new(stream, session.observedAddr, session.peerId)
|
||||
except CatchableError as exc:
|
||||
# TODO: incomingStream is using {.async.} with no raises
|
||||
raise (ref QuicTransportError)(msg: exc.msg, parent: exc)
|
||||
|
||||
method getWrapped*(self: QuicSession): P2PConnection =
|
||||
nil
|
||||
@@ -131,7 +139,7 @@ method handle*(m: QuicMuxer): Future[void] {.async: (raises: []).} =
|
||||
method close*(m: QuicMuxer) {.async: (raises: []).} =
|
||||
try:
|
||||
await m.quicSession.close()
|
||||
m.handleFut.cancel()
|
||||
m.handleFut.cancelSoon()
|
||||
except CatchableError as exc:
|
||||
discard
|
||||
|
||||
@@ -140,10 +148,31 @@ type QuicUpgrade = ref object of Upgrade
|
||||
|
||||
type QuicTransport* = ref object of Transport
|
||||
listener: Listener
|
||||
client: QuicClient
|
||||
privateKey: PrivateKey
|
||||
connections: seq[P2PConnection]
|
||||
rng: ref HmacDrbgContext
|
||||
|
||||
func new*(_: type QuicTransport, u: Upgrade): QuicTransport =
|
||||
QuicTransport(upgrader: QuicUpgrade(ms: u.ms))
|
||||
proc makeCertificateVerifier(): CertificateVerifier =
|
||||
proc certificateVerifier(certificatesDer: seq[seq[byte]]): bool =
|
||||
if certificatesDer.len != 1:
|
||||
trace "CertificateVerifier: expected one certificate in the chain",
|
||||
cert_count = certificatesDer.len
|
||||
return false
|
||||
|
||||
let cert =
|
||||
try:
|
||||
parse(certificatesDer[0])
|
||||
except CertificateParsingError as e:
|
||||
trace "CertificateVerifier: failed to parse certificate", msg = e.msg
|
||||
return false
|
||||
|
||||
return cert.verify()
|
||||
|
||||
return CustomCertificateVerifier.init(certificateVerifier)
|
||||
|
||||
func new*(_: type QuicTransport, u: Upgrade, privateKey: PrivateKey): QuicTransport =
|
||||
return QuicTransport(upgrader: QuicUpgrade(ms: u.ms), privateKey: privateKey)
|
||||
|
||||
method handles*(transport: QuicTransport, address: MultiAddress): bool {.raises: [].} =
|
||||
if not procCall Transport(transport).handles(address):
|
||||
@@ -155,12 +184,31 @@ method start*(
|
||||
) {.async: (raises: [LPError, transport.TransportError]).} =
|
||||
doAssert self.listener.isNil, "start() already called"
|
||||
#TODO handle multiple addr
|
||||
|
||||
let pubkey = self.privateKey.getPublicKey().valueOr:
|
||||
doAssert false, "could not obtain public key"
|
||||
return
|
||||
|
||||
let keypair = KeyPair(seckey: self.privateKey, pubkey: pubkey)
|
||||
let certTuple = generate(keypair, encodingFormat = EncodingFormat.PEM)
|
||||
|
||||
try:
|
||||
self.listener = listen(initTAddress(addrs[0]).tryGet)
|
||||
if self.rng.isNil:
|
||||
self.rng = newRng()
|
||||
let tlsConfig = TLSConfig.init(
|
||||
certTuple[0], certTuple[1], @[alpn], Opt.some(makeCertificateVerifier())
|
||||
)
|
||||
self.client = QuicClient.init(tlsConfig, rng = self.rng)
|
||||
self.listener =
|
||||
QuicServer.init(tlsConfig, rng = self.rng).listen(initTAddress(addrs[0]).tryGet)
|
||||
await procCall Transport(self).start(addrs)
|
||||
self.addrs[0] =
|
||||
MultiAddress.init(self.listener.localAddress(), IPPROTO_UDP).tryGet() &
|
||||
MultiAddress.init("/quic-v1").get()
|
||||
except QuicConfigError as exc:
|
||||
doAssert false, "invalid quic setup: " & $exc.msg
|
||||
except QuicError as exc:
|
||||
raise (ref QuicTransportError)(msg: exc.msg, parent: exc)
|
||||
except TransportOsError as exc:
|
||||
raise (ref QuicTransportError)(msg: exc.msg, parent: exc)
|
||||
self.running = true
|
||||
@@ -174,32 +222,38 @@ method stop*(transport: QuicTransport) {.async: (raises: []).} =
|
||||
await transport.listener.stop()
|
||||
except CatchableError as exc:
|
||||
trace "Error shutting down Quic transport", description = exc.msg
|
||||
transport.listener.destroy()
|
||||
transport.running = false
|
||||
transport.listener = nil
|
||||
|
||||
proc wrapConnection(
|
||||
transport: QuicTransport, connection: QuicConnection
|
||||
): P2PConnection {.raises: [Defect, TransportOsError, LPError].} =
|
||||
): QuicSession {.raises: [TransportOsError, LPError].} =
|
||||
let
|
||||
remoteAddr = connection.remoteAddress()
|
||||
observedAddr =
|
||||
MultiAddress.init(remoteAddr, IPPROTO_UDP).get() &
|
||||
MultiAddress.init("/quic-v1").get()
|
||||
conres = QuicSession(connection: connection, observedAddr: Opt.some(observedAddr))
|
||||
conres.initStream()
|
||||
session = QuicSession(connection: connection, observedAddr: Opt.some(observedAddr))
|
||||
|
||||
session.initStream()
|
||||
|
||||
transport.connections.add(session)
|
||||
|
||||
transport.connections.add(conres)
|
||||
proc onClose() {.async: (raises: []).} =
|
||||
await noCancel conres.join()
|
||||
transport.connections.keepItIf(it != conres)
|
||||
await noCancel session.join()
|
||||
transport.connections.keepItIf(it != session)
|
||||
trace "Cleaned up client"
|
||||
|
||||
asyncSpawn onClose()
|
||||
return conres
|
||||
|
||||
return session
|
||||
|
||||
method accept*(
|
||||
self: QuicTransport
|
||||
): Future[P2PConnection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
): Future[connection.Connection] {.
|
||||
async: (raises: [transport.TransportError, CancelledError])
|
||||
.} =
|
||||
doAssert not self.listener.isNil, "call start() before calling accept()"
|
||||
try:
|
||||
let connection = await self.listener.accept()
|
||||
@@ -214,10 +268,12 @@ method dial*(
|
||||
hostname: string,
|
||||
address: MultiAddress,
|
||||
peerId: Opt[PeerId] = Opt.none(PeerId),
|
||||
): Future[P2PConnection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
): Future[connection.Connection] {.
|
||||
async: (raises: [transport.TransportError, CancelledError])
|
||||
.} =
|
||||
try:
|
||||
let connection = await dial(initTAddress(address).tryGet)
|
||||
return self.wrapConnection(connection)
|
||||
let quicConnection = await self.client.dial(initTAddress(address).tryGet)
|
||||
return self.wrapConnection(quicConnection)
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError as e:
|
||||
|
||||
@@ -302,7 +302,7 @@ proc new*(
|
||||
flags: set[ServerFlags] = {},
|
||||
): TorSwitch {.raises: [LPError], public.} =
|
||||
var builder = SwitchBuilder.new().withRng(rng).withTransport(
|
||||
proc(upgr: Upgrade): Transport =
|
||||
proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
|
||||
TorTransport.new(torServer, flags, upgr)
|
||||
)
|
||||
if addresses.len != 0:
|
||||
@@ -325,7 +325,7 @@ proc new*(
|
||||
return torSwitch
|
||||
|
||||
method addTransport*(s: TorSwitch, t: Transport) =
|
||||
doAssert(false, "not implemented!")
|
||||
doAssert(false, "[TorSwitch.addTransport ] abstract method not implemented!")
|
||||
|
||||
method getTorTransport*(s: TorSwitch): Transport {.base.} =
|
||||
return s.transports[0]
|
||||
|
||||
@@ -66,7 +66,7 @@ method accept*(
|
||||
## accept incoming connections
|
||||
##
|
||||
|
||||
doAssert(false, "Not implemented!")
|
||||
doAssert(false, "[Transport.accept] abstract method not implemented!")
|
||||
|
||||
method dial*(
|
||||
self: Transport,
|
||||
@@ -79,7 +79,7 @@ method dial*(
|
||||
## dial a peer
|
||||
##
|
||||
|
||||
doAssert(false, "Not implemented!")
|
||||
doAssert(false, "[Transport.dial] abstract method not implemented!")
|
||||
|
||||
proc dial*(
|
||||
self: Transport, address: MultiAddress, peerId: Opt[PeerId] = Opt.none(PeerId)
|
||||
|
||||
@@ -45,7 +45,7 @@ type
|
||||
method upgrade*(
|
||||
self: Upgrade, conn: Connection, peerId: Opt[PeerId]
|
||||
): Future[Muxer] {.async: (raises: [CancelledError, LPError], raw: true), base.} =
|
||||
raiseAssert("Not implemented!")
|
||||
raiseAssert("[Upgrade.upgrade] abstract method not implemented!")
|
||||
|
||||
proc secure*(
|
||||
self: Upgrade, conn: Connection, peerId: Opt[PeerId]
|
||||
|
||||
@@ -20,7 +20,7 @@ when defined(windows): import winlean else: import posix
|
||||
const
|
||||
RTRANSPMA* = mapOr(TCP, WebSockets, UNIX)
|
||||
|
||||
TRANSPMA* = mapOr(RTRANSPMA, QUIC, UDP)
|
||||
TRANSPMA* = mapOr(RTRANSPMA, QUIC, QUIC_V1, UDP)
|
||||
|
||||
proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] =
|
||||
## Initialize ``TransportAddress`` with MultiAddress ``ma``.
|
||||
@@ -75,7 +75,7 @@ proc connect*(
|
||||
## ``bufferSize`` is size of internal buffer for transport.
|
||||
##
|
||||
|
||||
if not (RTRANSPMA.match(ma)):
|
||||
if not (TRANSPMA.match(ma)):
|
||||
raise newException(MaInvalidAddress, "Incorrect or unsupported address!")
|
||||
|
||||
let transportAddress = initTAddress(ma).tryGet()
|
||||
|
||||
@@ -8,7 +8,7 @@ import ../libp2p/protocols/connectivity/relay/[relay, client, utils]
|
||||
type
|
||||
SwitchCreator = proc(
|
||||
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
|
||||
prov: TransportProvider = proc(upgr: Upgrade): Transport =
|
||||
prov: TransportProvider = proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
|
||||
TcpTransport.new({}, upgr),
|
||||
relay: Relay = Relay.new(circuitRelayV1 = true),
|
||||
): Switch {.gcsafe, raises: [LPError].}
|
||||
@@ -319,7 +319,7 @@ proc commonInteropTests*(name: string, swCreator: SwitchCreator) =
|
||||
|
||||
let nativeNode = swCreator(
|
||||
ma = wsAddress,
|
||||
prov = proc(upgr: Upgrade): Transport =
|
||||
prov = proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
|
||||
WsTransport.new(upgr),
|
||||
)
|
||||
|
||||
@@ -359,7 +359,7 @@ proc commonInteropTests*(name: string, swCreator: SwitchCreator) =
|
||||
.withRng(crypto.newRng())
|
||||
.withMplex()
|
||||
.withTransport(
|
||||
proc(upgr: Upgrade): Transport =
|
||||
proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
|
||||
WsTransport.new(upgr)
|
||||
)
|
||||
.withNoise()
|
||||
|
||||
@@ -4,7 +4,7 @@ import ../libp2p/crypto/crypto, ../libp2p/protocols/connectivity/relay/[relay, c
|
||||
|
||||
proc switchMplexCreator(
|
||||
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
|
||||
prov: TransportProvider = proc(upgr: Upgrade): Transport =
|
||||
prov: TransportProvider = proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
|
||||
TcpTransport.new({}, upgr),
|
||||
relay: Relay = Relay.new(circuitRelayV1 = true),
|
||||
): Switch {.raises: [LPError].} =
|
||||
@@ -27,7 +27,7 @@ proc switchMplexCreator(
|
||||
|
||||
proc switchYamuxCreator(
|
||||
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
|
||||
prov: TransportProvider = proc(upgr: Upgrade): Transport =
|
||||
prov: TransportProvider = proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
|
||||
TcpTransport.new({}, upgr),
|
||||
relay: Relay = Relay.new(circuitRelayV1 = true),
|
||||
): Switch {.raises: [LPError].} =
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
{.used.}
|
||||
|
||||
import sequtils
|
||||
import chronos, stew/byteutils
|
||||
import
|
||||
../libp2p/[
|
||||
@@ -18,7 +17,43 @@ import ./helpers, ./commontransport
|
||||
suite "Quic transport":
|
||||
asyncTest "can handle local address":
|
||||
let ma = @[MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet()]
|
||||
let transport1 = QuicTransport.new()
|
||||
let privateKey = PrivateKey.random(ECDSA, (newRng())[]).tryGet()
|
||||
let transport1 = QuicTransport.new(Upgrade(), privateKey)
|
||||
await transport1.start(ma)
|
||||
check transport1.handles(transport1.addrs[0])
|
||||
await transport1.stop()
|
||||
#
|
||||
asyncTest "transport e2e":
|
||||
let serverMA = @[MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet()]
|
||||
let clientMA = @[MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet()]
|
||||
let privateKey = PrivateKey.random(ECDSA, (newRng())[]).tryGet()
|
||||
let server: QuicTransport = QuicTransport.new(Upgrade(), privateKey)
|
||||
await server.start(serverMA)
|
||||
|
||||
proc runClient() {.async.} =
|
||||
let rng = newRng()
|
||||
let privateKey = PrivateKey.random(ECDSA, (rng)[]).tryGet()
|
||||
let client: QuicTransport = QuicTransport.new(Upgrade(), privateKey)
|
||||
await client.start(clientMA)
|
||||
let conn = await client.dial("", server.addrs[0])
|
||||
let stream = await getStream(QuicSession(conn), Direction.Out)
|
||||
await stream.write("client")
|
||||
var resp: array[6, byte]
|
||||
await stream.readExactly(addr resp, 6)
|
||||
await stream.close()
|
||||
check string.fromBytes(resp) == "server"
|
||||
await client.stop()
|
||||
|
||||
proc serverAcceptHandler() {.async.} =
|
||||
let conn = await server.accept()
|
||||
let stream = await getStream(QuicSession(conn), Direction.In)
|
||||
var resp: array[6, byte]
|
||||
await stream.readExactly(addr resp, 6)
|
||||
check string.fromBytes(resp) == "client"
|
||||
|
||||
await stream.write("server")
|
||||
await stream.close()
|
||||
await server.stop()
|
||||
|
||||
asyncSpawn serverAcceptHandler()
|
||||
await runClient()
|
||||
|
||||
@@ -994,7 +994,7 @@ suite "Switch":
|
||||
.withRng(crypto.newRng())
|
||||
.withMplex()
|
||||
.withTransport(
|
||||
proc(upgr: Upgrade): Transport =
|
||||
proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
|
||||
WsTransport.new(upgr)
|
||||
)
|
||||
.withNameResolver(resolver)
|
||||
@@ -1007,7 +1007,7 @@ suite "Switch":
|
||||
.withRng(crypto.newRng())
|
||||
.withMplex()
|
||||
.withTransport(
|
||||
proc(upgr: Upgrade): Transport =
|
||||
proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
|
||||
WsTransport.new(upgr)
|
||||
)
|
||||
.withTcpTransport()
|
||||
@@ -1047,8 +1047,8 @@ suite "Switch":
|
||||
.withAddress(quicAddress1)
|
||||
.withRng(crypto.newRng())
|
||||
.withTransport(
|
||||
proc(upgr: Upgrade): Transport =
|
||||
QuicTransport.new(upgr)
|
||||
proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
|
||||
QuicTransport.new(upgr, privateKey)
|
||||
)
|
||||
.withNoise()
|
||||
.build()
|
||||
@@ -1058,8 +1058,8 @@ suite "Switch":
|
||||
.withAddress(quicAddress2)
|
||||
.withRng(crypto.newRng())
|
||||
.withTransport(
|
||||
proc(upgr: Upgrade): Transport =
|
||||
QuicTransport.new(upgr)
|
||||
proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
|
||||
QuicTransport.new(upgr, privateKey)
|
||||
)
|
||||
.withNoise()
|
||||
.build()
|
||||
|
||||
@@ -45,7 +45,7 @@ proc main() {.async.} =
|
||||
of "ws":
|
||||
discard switchBuilder
|
||||
.withTransport(
|
||||
proc(upgr: Upgrade): Transport =
|
||||
proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
|
||||
WsTransport.new(upgr)
|
||||
)
|
||||
.withAddress(MultiAddress.init("/ip4/" & ip & "/tcp/0/ws").tryGet())
|
||||
|
||||
Reference in New Issue
Block a user