mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-09 14:28:11 -05:00
chore(transports): specify raised exceptions (#1266)
This commit is contained in:
@@ -55,7 +55,9 @@ method stop*(self: RelayTransport) {.async: (raises: []).} =
|
||||
except AsyncQueueEmptyError:
|
||||
continue # checked with self.queue.empty()
|
||||
|
||||
method accept*(self: RelayTransport): Future[Connection] {.async.} =
|
||||
method accept*(
|
||||
self: RelayTransport
|
||||
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
result = await self.queue.popFirst()
|
||||
|
||||
proc dial*(self: RelayTransport, ma: MultiAddress): Future[Connection] {.async.} =
|
||||
@@ -84,10 +86,10 @@ proc dial*(self: RelayTransport, ma: MultiAddress): Future[Connection] {.async.}
|
||||
rc = RelayConnection.new(conn, 0, 0)
|
||||
return await self.client.dialPeerV2(rc, dstPeerId, @[])
|
||||
except CancelledError as exc:
|
||||
safeClose(rc)
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
if not rc.isNil:
|
||||
await rc.close()
|
||||
safeClose(rc)
|
||||
raise exc
|
||||
|
||||
method dial*(
|
||||
@@ -95,10 +97,15 @@ method dial*(
|
||||
hostname: string,
|
||||
ma: MultiAddress,
|
||||
peerId: Opt[PeerId] = Opt.none(PeerId),
|
||||
): Future[Connection] {.async.} =
|
||||
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
peerId.withValue(pid):
|
||||
let address = MultiAddress.init($ma & "/p2p/" & $pid).tryGet()
|
||||
result = await self.dial(address)
|
||||
try:
|
||||
let address = MultiAddress.init($ma & "/p2p/" & $pid).tryGet()
|
||||
result = await self.dial(address)
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError as e:
|
||||
raise newException(transport.TransportDialError, e.msg, e)
|
||||
|
||||
method handles*(self: RelayTransport, ma: MultiAddress): bool {.gcsafe.} =
|
||||
try:
|
||||
|
||||
@@ -22,6 +22,7 @@ type
|
||||
P2PConnection = connection.Connection
|
||||
QuicConnection = quic.Connection
|
||||
QuicTransportError* = object of transport.TransportError
|
||||
QuicTransportDialError* = object of transport.TransportDialError
|
||||
|
||||
# Stream
|
||||
type QuicStream* = ref object of P2PConnection
|
||||
@@ -74,13 +75,13 @@ method closeImpl*(stream: QuicStream) {.async: (raises: []).} =
|
||||
type QuicSession* = ref object of P2PConnection
|
||||
connection: QuicConnection
|
||||
|
||||
method close*(session: QuicSession) {.async, base.} =
|
||||
await session.connection.close()
|
||||
method close*(session: QuicSession) {.async: (raises: []).} =
|
||||
safeClose(session.connection)
|
||||
await procCall P2PConnection(session).close()
|
||||
|
||||
proc getStream*(
|
||||
session: QuicSession, direction = Direction.In
|
||||
): Future[QuicStream] {.async.} =
|
||||
): Future[QuicStream] {.async: (raises: [CatchableError]).} =
|
||||
var stream: Stream
|
||||
case direction
|
||||
of Direction.In:
|
||||
@@ -108,7 +109,7 @@ method newStream*(
|
||||
except CatchableError as exc:
|
||||
raise newException(MuxerError, exc.msg, exc)
|
||||
|
||||
proc handleStream(m: QuicMuxer, chann: QuicStream) {.async.} =
|
||||
proc handleStream(m: QuicMuxer, chann: QuicStream) {.async: (raises: []).} =
|
||||
## call the muxer stream handler for this channel
|
||||
##
|
||||
try:
|
||||
@@ -144,7 +145,7 @@ type QuicTransport* = ref object of Transport
|
||||
func new*(_: type QuicTransport, u: Upgrade): QuicTransport =
|
||||
QuicTransport(upgrader: QuicUpgrade(ms: u.ms))
|
||||
|
||||
method handles*(transport: QuicTransport, address: MultiAddress): bool =
|
||||
method handles*(transport: QuicTransport, address: MultiAddress): bool {.raises: [].} =
|
||||
if not procCall Transport(transport).handles(address):
|
||||
return false
|
||||
QUIC_V1.match(address)
|
||||
@@ -188,27 +189,39 @@ proc wrapConnection(
|
||||
conres.initStream()
|
||||
|
||||
transport.connections.add(conres)
|
||||
proc onClose() {.async.} =
|
||||
await conres.join()
|
||||
proc onClose() {.async: (raises: []).} =
|
||||
await noCancel conres.join()
|
||||
transport.connections.keepItIf(it != conres)
|
||||
trace "Cleaned up client"
|
||||
|
||||
asyncSpawn onClose()
|
||||
return conres
|
||||
|
||||
method accept*(transport: QuicTransport): Future[P2PConnection] {.async.} =
|
||||
doAssert not transport.listener.isNil, "call start() before calling accept()"
|
||||
let connection = await transport.listener.accept()
|
||||
return transport.wrapConnection(connection)
|
||||
method accept*(
|
||||
self: QuicTransport
|
||||
): Future[P2PConnection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
doAssert not self.listener.isNil, "call start() before calling accept()"
|
||||
try:
|
||||
let connection = await self.listener.accept()
|
||||
return self.wrapConnection(connection)
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError as e:
|
||||
raise (ref QuicTransportError)(msg: e.msg, parent: e)
|
||||
|
||||
method dial*(
|
||||
transport: QuicTransport,
|
||||
self: QuicTransport,
|
||||
hostname: string,
|
||||
address: MultiAddress,
|
||||
peerId: Opt[PeerId] = Opt.none(PeerId),
|
||||
): Future[P2PConnection] {.async.} =
|
||||
let connection = await dial(initTAddress(address).tryGet)
|
||||
return transport.wrapConnection(connection)
|
||||
): Future[P2PConnection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
try:
|
||||
let connection = await dial(initTAddress(address).tryGet)
|
||||
return self.wrapConnection(connection)
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError as e:
|
||||
raise newException(QuicTransportDialError, e.msg, e)
|
||||
|
||||
method upgrade*(
|
||||
self: QuicTransport, conn: P2PConnection, peerId: Opt[PeerId]
|
||||
|
||||
@@ -197,7 +197,9 @@ method stop*(self: TcpTransport): Future[void] {.async: (raises: []).} =
|
||||
"No incoming connections possible without start"
|
||||
await noCancel allFutures(self.clients[Direction.Out].mapIt(it.closeWait()))
|
||||
|
||||
method accept*(self: TcpTransport): Future[Connection] =
|
||||
method accept*(
|
||||
self: TcpTransport
|
||||
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
## accept a new TCP connection, returning nil on non-fatal errors
|
||||
##
|
||||
## Raises an exception when the transport is broken and cannot be used for
|
||||
@@ -206,130 +208,121 @@ method accept*(self: TcpTransport): Future[Connection] =
|
||||
# information is lost and must be logged here instead of being
|
||||
# available to the caller - further refactoring should propagate errors
|
||||
# to the caller instead
|
||||
proc impl(
|
||||
self: TcpTransport
|
||||
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
proc cancelAcceptFuts() =
|
||||
for fut in self.acceptFuts:
|
||||
if not fut.completed():
|
||||
fut.cancel()
|
||||
|
||||
if not self.running:
|
||||
raise newTransportClosedError()
|
||||
proc cancelAcceptFuts() =
|
||||
for fut in self.acceptFuts:
|
||||
if not fut.completed():
|
||||
fut.cancel()
|
||||
|
||||
if self.servers.len == 0:
|
||||
raise (ref TcpTransportError)(msg: "No listeners configured")
|
||||
elif self.acceptFuts.len == 0:
|
||||
# Holds futures representing ongoing accept calls on multiple servers.
|
||||
self.acceptFuts = self.servers.mapIt(it.accept())
|
||||
if not self.running:
|
||||
raise newTransportClosedError()
|
||||
|
||||
let
|
||||
finished =
|
||||
try:
|
||||
# Waits for any one of these futures to complete, indicating that a new connection has been accepted on one of the servers.
|
||||
await one(self.acceptFuts)
|
||||
except ValueError:
|
||||
raiseAssert "Accept futures should not be empty"
|
||||
except CancelledError as exc:
|
||||
cancelAcceptFuts()
|
||||
raise exc
|
||||
index = self.acceptFuts.find(finished)
|
||||
if self.servers.len == 0:
|
||||
raise (ref TcpTransportError)(msg: "No listeners configured")
|
||||
elif self.acceptFuts.len == 0:
|
||||
# Holds futures representing ongoing accept calls on multiple servers.
|
||||
self.acceptFuts = self.servers.mapIt(it.accept())
|
||||
|
||||
# A new connection has been accepted. The corresponding server should immediately start accepting another connection.
|
||||
# Thus we replace the completed future with a new one by calling accept on the same server again.
|
||||
self.acceptFuts[index] = self.servers[index].accept()
|
||||
let transp =
|
||||
let
|
||||
finished =
|
||||
try:
|
||||
await finished
|
||||
except TransportTooManyError as exc:
|
||||
debug "Too many files opened", description = exc.msg
|
||||
return nil
|
||||
except TransportAbortedError as exc:
|
||||
debug "Connection aborted", description = exc.msg
|
||||
return nil
|
||||
except TransportUseClosedError as exc:
|
||||
raise newTransportClosedError(exc)
|
||||
except TransportOsError as exc:
|
||||
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
|
||||
except common.TransportError as exc: # Needed for chronos 4.0.0 support
|
||||
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
|
||||
# Waits for any one of these futures to complete, indicating that a new connection has been accepted on one of the servers.
|
||||
await one(self.acceptFuts)
|
||||
except ValueError:
|
||||
raiseAssert "Accept futures should not be empty"
|
||||
except CancelledError as exc:
|
||||
cancelAcceptFuts()
|
||||
raise exc
|
||||
index = self.acceptFuts.find(finished)
|
||||
|
||||
if not self.running: # Stopped while waiting
|
||||
await transp.closeWait()
|
||||
raise newTransportClosedError()
|
||||
# A new connection has been accepted. The corresponding server should immediately start accepting another connection.
|
||||
# Thus we replace the completed future with a new one by calling accept on the same server again.
|
||||
self.acceptFuts[index] = self.servers[index].accept()
|
||||
let transp =
|
||||
try:
|
||||
await finished
|
||||
except TransportTooManyError as exc:
|
||||
debug "Too many files opened", description = exc.msg
|
||||
return nil
|
||||
except TransportAbortedError as exc:
|
||||
debug "Connection aborted", description = exc.msg
|
||||
return nil
|
||||
except TransportUseClosedError as exc:
|
||||
raise newTransportClosedError(exc)
|
||||
except TransportOsError as exc:
|
||||
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
|
||||
except common.TransportError as exc: # Needed for chronos 4.0.0 support
|
||||
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
|
||||
except CancelledError as exc:
|
||||
cancelAcceptFuts()
|
||||
raise exc
|
||||
|
||||
let remote =
|
||||
try:
|
||||
transp.remoteAddress
|
||||
except TransportOsError as exc:
|
||||
# The connection had errors / was closed before `await` returned control
|
||||
await transp.closeWait()
|
||||
debug "Cannot read remote address", description = exc.msg
|
||||
return nil
|
||||
if not self.running: # Stopped while waiting
|
||||
safeCloseWait(transp)
|
||||
raise newTransportClosedError()
|
||||
|
||||
let observedAddr =
|
||||
MultiAddress.init(remote).expect("Can initialize from remote address")
|
||||
self.connHandler(transp, Opt.some(observedAddr), Direction.In)
|
||||
let remote =
|
||||
try:
|
||||
transp.remoteAddress
|
||||
except TransportOsError as exc:
|
||||
# The connection had errors / was closed before `await` returned control
|
||||
safeCloseWait(transp)
|
||||
debug "Cannot read remote address", description = exc.msg
|
||||
return nil
|
||||
|
||||
impl(self)
|
||||
let observedAddr =
|
||||
MultiAddress.init(remote).expect("Can initialize from remote address")
|
||||
self.connHandler(transp, Opt.some(observedAddr), Direction.In)
|
||||
|
||||
method dial*(
|
||||
self: TcpTransport,
|
||||
hostname: string,
|
||||
address: MultiAddress,
|
||||
peerId: Opt[PeerId] = Opt.none(PeerId),
|
||||
): Future[Connection] =
|
||||
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
## dial a peer
|
||||
proc impl(
|
||||
self: TcpTransport, hostname: string, address: MultiAddress, peerId: Opt[PeerId]
|
||||
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
if self.stopping:
|
||||
raise newTransportClosedError()
|
||||
if self.stopping:
|
||||
raise newTransportClosedError()
|
||||
|
||||
let ta = initTAddress(address).valueOr:
|
||||
raise (ref TcpTransportError)(msg: "Unsupported address: " & $address)
|
||||
let ta = initTAddress(address).valueOr:
|
||||
raise (ref TcpTransportError)(msg: "Unsupported address: " & $address)
|
||||
|
||||
trace "Dialing remote peer", address = $address
|
||||
let transp =
|
||||
try:
|
||||
await(
|
||||
if self.networkReachability == NetworkReachability.NotReachable and
|
||||
self.addrs.len > 0:
|
||||
let local = initTAddress(self.addrs[0]).expect("self address is valid")
|
||||
self.clientFlags.incl(SocketFlags.ReusePort)
|
||||
connect(ta, flags = self.clientFlags, localAddress = local)
|
||||
else:
|
||||
connect(ta, flags = self.clientFlags)
|
||||
)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
|
||||
trace "Dialing remote peer", address = $address
|
||||
let transp =
|
||||
try:
|
||||
await(
|
||||
if self.networkReachability == NetworkReachability.NotReachable and
|
||||
self.addrs.len > 0:
|
||||
let local = initTAddress(self.addrs[0]).expect("self address is valid")
|
||||
self.clientFlags.incl(SocketFlags.ReusePort)
|
||||
connect(ta, flags = self.clientFlags, localAddress = local)
|
||||
else:
|
||||
connect(ta, flags = self.clientFlags)
|
||||
)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
|
||||
|
||||
# If `stop` is called after `connect` but before `await` returns, we might
|
||||
# end up with a race condition where `stop` returns but not all connections
|
||||
# have been closed - we drop connections in this case in order not to leak
|
||||
# them
|
||||
if self.stopping:
|
||||
# Stopped while waiting for new connection
|
||||
await transp.closeWait()
|
||||
raise newTransportClosedError()
|
||||
# If `stop` is called after `connect` but before `await` returns, we might
|
||||
# end up with a race condition where `stop` returns but not all connections
|
||||
# have been closed - we drop connections in this case in order not to leak
|
||||
# them
|
||||
if self.stopping:
|
||||
# Stopped while waiting for new connection
|
||||
safeCloseWait(transp)
|
||||
raise newTransportClosedError()
|
||||
|
||||
let observedAddr =
|
||||
try:
|
||||
MultiAddress.init(transp.remoteAddress).expect("remote address is valid")
|
||||
except TransportOsError as exc:
|
||||
await transp.closeWait()
|
||||
raise (ref TcpTransportError)(msg: exc.msg)
|
||||
let observedAddr =
|
||||
try:
|
||||
MultiAddress.init(transp.remoteAddress).expect("remote address is valid")
|
||||
except TransportOsError as exc:
|
||||
safeCloseWait(transp)
|
||||
raise (ref TcpTransportError)(msg: exc.msg)
|
||||
|
||||
self.connHandler(transp, Opt.some(observedAddr), Direction.Out)
|
||||
self.connHandler(transp, Opt.some(observedAddr), Direction.Out)
|
||||
|
||||
impl(self, hostname, address, peerId)
|
||||
|
||||
method handles*(t: TcpTransport, address: MultiAddress): bool =
|
||||
method handles*(t: TcpTransport, address: MultiAddress): bool {.raises: [].} =
|
||||
if procCall Transport(t).handles(address):
|
||||
if address.protocols.isOk:
|
||||
return TCP.match(address)
|
||||
|
||||
@@ -94,26 +94,37 @@ proc handlesStart(address: MultiAddress): bool {.gcsafe.} =
|
||||
|
||||
proc connectToTorServer(
|
||||
transportAddress: TransportAddress
|
||||
): Future[StreamTransport] {.async.} =
|
||||
let transp = await connect(transportAddress)
|
||||
try:
|
||||
discard await transp.write(
|
||||
@[Socks5ProtocolVersion, NMethods, Socks5AuthMethod.NoAuth.byte]
|
||||
): Future[StreamTransport] {.
|
||||
async: (
|
||||
raises: [
|
||||
Socks5VersionError, Socks5AuthFailedError, Socks5ServerReplyError, LPError,
|
||||
common.TransportError, CancelledError,
|
||||
]
|
||||
)
|
||||
let
|
||||
serverReply = await transp.read(2)
|
||||
socks5ProtocolVersion = serverReply[0]
|
||||
serverSelectedMethod = serverReply[1]
|
||||
if socks5ProtocolVersion != Socks5ProtocolVersion:
|
||||
raise newException(Socks5VersionError, "Unsupported socks version")
|
||||
if serverSelectedMethod != Socks5AuthMethod.NoAuth.byte:
|
||||
raise newException(Socks5AuthFailedError, "Unsupported auth method")
|
||||
return transp
|
||||
except CatchableError as err:
|
||||
await transp.closeWait()
|
||||
raise err
|
||||
.} =
|
||||
let transp = await connect(transportAddress)
|
||||
discard
|
||||
await transp.write(@[Socks5ProtocolVersion, NMethods, Socks5AuthMethod.NoAuth.byte])
|
||||
let
|
||||
serverReply = await transp.read(2)
|
||||
socks5ProtocolVersion = serverReply[0]
|
||||
serverSelectedMethod = serverReply[1]
|
||||
if socks5ProtocolVersion != Socks5ProtocolVersion:
|
||||
raise newException(Socks5VersionError, "Unsupported socks version")
|
||||
if serverSelectedMethod != Socks5AuthMethod.NoAuth.byte:
|
||||
raise newException(Socks5AuthFailedError, "Unsupported auth method")
|
||||
return transp
|
||||
|
||||
proc readServerReply(transp: StreamTransport) {.async.} =
|
||||
proc readServerReply(
|
||||
transp: StreamTransport
|
||||
) {.
|
||||
async: (
|
||||
raises: [
|
||||
Socks5VersionError, Socks5ServerReplyError, LPError, common.TransportError,
|
||||
CancelledError,
|
||||
]
|
||||
)
|
||||
.} =
|
||||
## The specification for this code is defined on
|
||||
## [link text](https://www.rfc-editor.org/rfc/rfc1928#section-5)
|
||||
## and [link text](https://www.rfc-editor.org/rfc/rfc1928#section-6).
|
||||
@@ -129,10 +140,9 @@ proc readServerReply(transp: StreamTransport) {.async.} =
|
||||
if serverReply != Socks5ReplyType.Succeeded.byte:
|
||||
var socks5ReplyType: Socks5ReplyType
|
||||
if socks5ReplyType.checkedEnumAssign(serverReply):
|
||||
raise
|
||||
newException(Socks5ServerReplyError, fmt"Server reply error: {socks5ReplyType}")
|
||||
raise newException(Socks5ServerReplyError, "Server reply error")
|
||||
else:
|
||||
raise newException(LPError, fmt"Unexpected server reply: {serverReply}")
|
||||
raise newException(LPError, "Unexpected server reply")
|
||||
let atyp = firstFourOctets[3]
|
||||
case atyp
|
||||
of Socks5AddressType.IPv4.byte:
|
||||
@@ -147,13 +157,13 @@ proc readServerReply(transp: StreamTransport) {.async.} =
|
||||
|
||||
proc parseOnion3(
|
||||
address: MultiAddress
|
||||
): (byte, seq[byte], seq[byte]) {.raises: [LPError, ValueError].} =
|
||||
): (byte, seq[byte], seq[byte]) {.raises: [LPError].} =
|
||||
var addressArray = ($address).split('/')
|
||||
if addressArray.len < 2:
|
||||
raise newException(LPError, fmt"Onion address not supported {address}")
|
||||
raise newException(LPError, "Onion address not supported")
|
||||
addressArray = addressArray[2].split(':')
|
||||
if addressArray.len == 0:
|
||||
raise newException(LPError, fmt"Onion address not supported {address}")
|
||||
raise newException(LPError, "Onion address not supported")
|
||||
let
|
||||
addressStr = addressArray[0] & ".onion"
|
||||
dstAddr = @(uint8(addressStr.len).toBytes()) & addressStr.toBytes()
|
||||
@@ -162,14 +172,14 @@ proc parseOnion3(
|
||||
|
||||
proc parseIpTcp(
|
||||
address: MultiAddress
|
||||
): (byte, seq[byte], seq[byte]) {.raises: [LPError, ValueError].} =
|
||||
): (byte, seq[byte], seq[byte]) {.raises: [LPError].} =
|
||||
let (codec, atyp) =
|
||||
if IPv4Tcp.match(address):
|
||||
(multiCodec("ip4"), Socks5AddressType.IPv4.byte)
|
||||
elif IPv6Tcp.match(address):
|
||||
(multiCodec("ip6"), Socks5AddressType.IPv6.byte)
|
||||
else:
|
||||
raise newException(LPError, fmt"IP address not supported {address}")
|
||||
raise newException(LPError, "IP address not supported")
|
||||
let
|
||||
dstAddr = address[codec].tryGet().protoArgument().tryGet()
|
||||
dstPort = address[multiCodec("tcp")].tryGet().protoArgument().tryGet()
|
||||
@@ -177,14 +187,23 @@ proc parseIpTcp(
|
||||
|
||||
proc parseDnsTcp(
|
||||
address: MultiAddress
|
||||
): (byte, seq[byte], seq[byte]) {.raises: [LPError, ValueError].} =
|
||||
): (byte, seq[byte], seq[byte]) {.raises: [LPError].} =
|
||||
let
|
||||
dnsAddress = address[multiCodec("dns")].tryGet().protoArgument().tryGet()
|
||||
dstAddr = @(uint8(dnsAddress.len).toBytes()) & dnsAddress
|
||||
dstPort = address[multiCodec("tcp")].tryGet().protoArgument().tryGet()
|
||||
(Socks5AddressType.FQDN.byte, dstAddr, dstPort)
|
||||
|
||||
proc dialPeer(transp: StreamTransport, address: MultiAddress) {.async.} =
|
||||
proc dialPeer(
|
||||
transp: StreamTransport, address: MultiAddress
|
||||
) {.
|
||||
async: (
|
||||
raises: [
|
||||
LPError, common.TransportError, CancelledError, Socks5ServerReplyError,
|
||||
Socks5VersionError,
|
||||
]
|
||||
)
|
||||
.} =
|
||||
let (atyp, dstAddr, dstPort) =
|
||||
if Onion3.match(address):
|
||||
parseOnion3(address)
|
||||
@@ -193,7 +212,7 @@ proc dialPeer(transp: StreamTransport, address: MultiAddress) {.async.} =
|
||||
elif DnsTcp.match(address):
|
||||
parseDnsTcp(address)
|
||||
else:
|
||||
raise newException(LPError, fmt"Address not supported: {address}")
|
||||
raise newException(LPError, "Address not supported")
|
||||
|
||||
let reserved = byte(0)
|
||||
let request =
|
||||
@@ -207,20 +226,25 @@ method dial*(
|
||||
hostname: string,
|
||||
address: MultiAddress,
|
||||
peerId: Opt[PeerId] = Opt.none(PeerId),
|
||||
): Future[Connection] {.async.} =
|
||||
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
## dial a peer
|
||||
##
|
||||
if not handlesDial(address):
|
||||
raise newException(LPError, fmt"Address not supported: {address}")
|
||||
raise newException(TransportDialError, "Address not supported")
|
||||
trace "Dialing remote peer", address = $address
|
||||
let transp = await connectToTorServer(self.transportAddress)
|
||||
|
||||
var transp: StreamTransport
|
||||
|
||||
try:
|
||||
transp = await connectToTorServer(self.transportAddress)
|
||||
await dialPeer(transp, address)
|
||||
return self.tcpTransport.connHandler(transp, Opt.none(MultiAddress), Direction.Out)
|
||||
except CatchableError as err:
|
||||
await transp.closeWait()
|
||||
raise err
|
||||
except CancelledError as e:
|
||||
safeCloseWait(transp)
|
||||
raise e
|
||||
except CatchableError as e:
|
||||
safeCloseWait(transp)
|
||||
raise newException(transport.TransportDialError, e.msg, e)
|
||||
|
||||
method start*(
|
||||
self: TorTransport, addrs: seq[MultiAddress]
|
||||
@@ -249,7 +273,9 @@ method start*(
|
||||
"Tor Transport couldn't start, no supported addr was provided.",
|
||||
)
|
||||
|
||||
method accept*(self: TorTransport): Future[Connection] {.async.} =
|
||||
method accept*(
|
||||
self: TorTransport
|
||||
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
## accept a new Tor connection
|
||||
##
|
||||
let conn = await self.tcpTransport.accept()
|
||||
@@ -262,7 +288,7 @@ method stop*(self: TorTransport) {.async: (raises: []).} =
|
||||
await procCall Transport(self).stop() # call base
|
||||
await self.tcpTransport.stop()
|
||||
|
||||
method handles*(t: TorTransport, address: MultiAddress): bool {.gcsafe.} =
|
||||
method handles*(t: TorTransport, address: MultiAddress): bool {.gcsafe, raises: [].} =
|
||||
if procCall Transport(t).handles(address):
|
||||
return handlesDial(address) or handlesStart(address)
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ type
|
||||
TransportError* = object of LPError
|
||||
TransportInvalidAddrError* = object of TransportError
|
||||
TransportClosedError* = object of TransportError
|
||||
TransportDialError* = object of TransportError
|
||||
|
||||
Transport* = ref object of RootObj
|
||||
addrs*: seq[MultiAddress]
|
||||
@@ -57,7 +58,11 @@ method stop*(self: Transport) {.base, async: (raises: []).} =
|
||||
trace "stopping transport", address = $self.addrs
|
||||
self.running = false
|
||||
|
||||
method accept*(self: Transport): Future[Connection] {.base, gcsafe.} =
|
||||
method accept*(
|
||||
self: Transport
|
||||
): Future[Connection] {.
|
||||
gcsafe, base, async: (raises: [TransportError, CancelledError])
|
||||
.} =
|
||||
## accept incoming connections
|
||||
##
|
||||
|
||||
@@ -68,7 +73,9 @@ method dial*(
|
||||
hostname: string,
|
||||
address: MultiAddress,
|
||||
peerId: Opt[PeerId] = Opt.none(PeerId),
|
||||
): Future[Connection] {.base, gcsafe.} =
|
||||
): Future[Connection] {.
|
||||
base, gcsafe, async: (raises: [TransportError, CancelledError])
|
||||
.} =
|
||||
## dial a peer
|
||||
##
|
||||
|
||||
@@ -87,7 +94,9 @@ method upgrade*(
|
||||
##
|
||||
self.upgrader.upgrade(conn, peerId)
|
||||
|
||||
method handles*(self: Transport, address: MultiAddress): bool {.base, gcsafe.} =
|
||||
method handles*(
|
||||
self: Transport, address: MultiAddress
|
||||
): bool {.base, gcsafe, raises: [].} =
|
||||
## check if transport supports the multiaddress
|
||||
##
|
||||
# by default we skip circuit addresses to avoid
|
||||
@@ -96,3 +105,17 @@ method handles*(self: Transport, address: MultiAddress): bool {.base, gcsafe.} =
|
||||
return false
|
||||
|
||||
protocols.filterIt(it == multiCodec("p2p-circuit")).len == 0
|
||||
|
||||
template safeCloseWait*(stream: untyped) =
|
||||
if not isNil(stream):
|
||||
try:
|
||||
await noCancel stream.closeWait()
|
||||
except CatchableError as e:
|
||||
trace "Error closing", description = e.msg
|
||||
|
||||
template safeClose*(stream: untyped) =
|
||||
if not isNil(stream):
|
||||
try:
|
||||
await noCancel stream.close()
|
||||
except CatchableError as e:
|
||||
trace "Error closing", description = e.msg
|
||||
|
||||
@@ -218,7 +218,10 @@ method stop*(self: WsTransport) {.async: (raises: []).} =
|
||||
|
||||
proc connHandler(
|
||||
self: WsTransport, stream: WSSession, secure: bool, dir: Direction
|
||||
): Future[Connection] {.async.} =
|
||||
): Future[Connection] {.async: (raises: [CatchableError]).} =
|
||||
## Returning CatchableError is fine because we later handle different exceptions.
|
||||
##
|
||||
|
||||
let observedAddr =
|
||||
try:
|
||||
let
|
||||
@@ -233,21 +236,23 @@ proc connHandler(
|
||||
except CatchableError as exc:
|
||||
trace "Failed to create observedAddr", description = exc.msg
|
||||
if not (isNil(stream) and stream.stream.reader.closed):
|
||||
await stream.close()
|
||||
safeClose(stream)
|
||||
raise exc
|
||||
|
||||
let conn = WsStream.new(stream, dir, Opt.some(observedAddr))
|
||||
|
||||
self.connections[dir].add(conn)
|
||||
proc onClose() {.async.} =
|
||||
await conn.session.stream.reader.join()
|
||||
proc onClose() {.async: (raises: []).} =
|
||||
await noCancel conn.session.stream.reader.join()
|
||||
self.connections[dir].keepItIf(it != conn)
|
||||
trace "Cleaned up client"
|
||||
|
||||
asyncSpawn onClose()
|
||||
return conn
|
||||
|
||||
method accept*(self: WsTransport): Future[Connection] {.async.} =
|
||||
method accept*(
|
||||
self: WsTransport
|
||||
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
## accept a new WS connection
|
||||
##
|
||||
|
||||
@@ -260,10 +265,15 @@ method accept*(self: WsTransport): Future[Connection] {.async.} =
|
||||
if self.acceptFuts.len <= 0:
|
||||
return
|
||||
|
||||
let
|
||||
finished = await one(self.acceptFuts)
|
||||
index = self.acceptFuts.find(finished)
|
||||
let finished =
|
||||
try:
|
||||
await one(self.acceptFuts)
|
||||
except ValueError:
|
||||
raiseAssert("already checked with if")
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
|
||||
let index = self.acceptFuts.find(finished)
|
||||
self.acceptFuts[index] = self.httpservers[index].accept()
|
||||
|
||||
try:
|
||||
@@ -276,7 +286,7 @@ method accept*(self: WsTransport): Future[Connection] {.async.} =
|
||||
|
||||
return await self.connHandler(wstransp, isSecure, Direction.In)
|
||||
except CatchableError as exc:
|
||||
await req.stream.closeWait()
|
||||
await noCancel req.stream.closeWait()
|
||||
raise exc
|
||||
except WebSocketError as exc:
|
||||
debug "Websocket Error", description = exc.msg
|
||||
@@ -299,21 +309,22 @@ method accept*(self: WsTransport): Future[Connection] {.async.} =
|
||||
debug "OS Error", description = exc.msg
|
||||
except CatchableError as exc:
|
||||
info "Unexpected error accepting connection", description = exc.msg
|
||||
raise exc
|
||||
raise newException(transport.TransportError, exc.msg, exc)
|
||||
|
||||
method dial*(
|
||||
self: WsTransport,
|
||||
hostname: string,
|
||||
address: MultiAddress,
|
||||
peerId: Opt[PeerId] = Opt.none(PeerId),
|
||||
): Future[Connection] {.async.} =
|
||||
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
## dial a peer
|
||||
##
|
||||
|
||||
trace "Dialing remote peer", address = $address
|
||||
var transp: websock.WSSession
|
||||
|
||||
let
|
||||
secure = WSS.match(address)
|
||||
try:
|
||||
let secure = WSS.match(address)
|
||||
transp = await WebSocket.connect(
|
||||
address.initTAddress().tryGet(),
|
||||
"",
|
||||
@@ -321,14 +332,15 @@ method dial*(
|
||||
hostName = hostname,
|
||||
flags = self.tlsFlags,
|
||||
)
|
||||
|
||||
try:
|
||||
return await self.connHandler(transp, secure, Direction.Out)
|
||||
except CatchableError as exc:
|
||||
await transp.close()
|
||||
raise exc
|
||||
except CancelledError as e:
|
||||
safeClose(transp)
|
||||
raise e
|
||||
except CatchableError as e:
|
||||
safeClose(transp)
|
||||
raise newException(transport.TransportDialError, e.msg, e)
|
||||
|
||||
method handles*(t: WsTransport, address: MultiAddress): bool {.gcsafe.} =
|
||||
method handles*(t: WsTransport, address: MultiAddress): bool {.gcsafe, raises: [].} =
|
||||
if procCall Transport(t).handles(address):
|
||||
if address.protocols.isOk:
|
||||
return WebSockets.match(address)
|
||||
|
||||
Reference in New Issue
Block a user