diff --git a/quic/api.nim b/quic/api.nim index 541fb91..8825b14 100644 --- a/quic/api.nim +++ b/quic/api.nim @@ -102,10 +102,7 @@ proc listen*( proc dial*( self: QuicClient, address: TransportAddress ): Future[Connection] {. - async: ( - raises: - [CancelledError, CatchableError, TimeOutError, QuicError, TransportOsError] - ) + async: (raises: [CancelledError, TimeOutError, QuicError, TransportOsError]) .} = let tlsBackend = newClientTLSBackend( self.tlsConfig.certificate, self.tlsConfig.key, self.tlsConfig.alpn, @@ -124,12 +121,19 @@ proc dial*( let udp = newDatagramTransport(onReceive) connection = newOutgoingConnection(tlsBackend, udp, address, self.rng) + try: connection.startHandshake() await connection.waitForHandshake() - except CatchableError as exc: - # whatever error happens we need to destroy tlsBackend to free resources + # whatever error happens we need to destroy tlsBackend to free resources + except CancelledError as e: tlsBackend.destroy() - raise exc + raise e + except TimeOutError as e: + tlsBackend.destroy() + raise e + except QuicError as e: + tlsBackend.destroy() + raise e return connection diff --git a/quic/connection.nim b/quic/connection.nim index 442ce21..55b0242 100644 --- a/quic/connection.nim +++ b/quic/connection.nim @@ -42,11 +42,11 @@ proc `onRemoveId=`*(connection: Connection, callback: IdCallback) = proc `onClose=`*(connection: Connection, callback: proc() {.gcsafe, raises: [].}) = connection.onClose = Opt.some(callback) -proc drop*(connection: Connection) {.async.} = +proc drop*(connection: Connection) {.async: (raises: [CancelledError, QuicError]).} = trace "Dropping connection" await connection.quic.drop() -proc close*(connection: Connection) {.async.} = +proc close*(connection: Connection) {.async: (raises: [CancelledError, QuicError]).} = await connection.quic.close() if connection is OutgoingConnection: let outConn = OutgoingConnection(connection) @@ -54,7 +54,7 @@ proc close*(connection: Connection) {.async.} = outConn.tlsBackend.get().destroy() outConn.tlsBackend = Opt.none(TLSBackend) -proc waitClosed*(connection: Connection) {.async.} = +proc waitClosed*(connection: Connection) {.async: (raises: [CancelledError]).} = await connection.closed.wait() if connection is OutgoingConnection: let outConn = OutgoingConnection(connection) @@ -81,7 +81,7 @@ proc startSending(connection: Connection, remote: TransportAddress) = connection.loop = asyncLoop(send) -proc stopSending(connection: Connection) {.async.} = +proc stopSending(connection: Connection) {.async: (raises: [CancelledError]).} = trace "Stopping sending loop" await connection.loop.cancelAndWait() @@ -91,7 +91,7 @@ method closeUdp(connection: Connection) {.async: (raises: []), base.} = method closeUdp(connection: OutgoingConnection) {.async: (raises: []).} = await connection.udp.closeWait() -proc disconnect(connection: Connection) {.async.} = +proc disconnect(connection: Connection) {.async: (raises: [CancelledError]).} = trace "Disconnecting connection" await connection.stopSending() await connection.closeUdp() @@ -111,7 +111,7 @@ proc newIncomingConnection*( newQuicServerConnection(tlsBackend, udp.localAddress, remote, datagram, rng) let closed = newAsyncEvent() let connection = IncomingConnection(udp: udp, quic: quic, closed: closed) - proc onDisconnect() {.async.} = + proc onDisconnect() {.async: (raises: [CancelledError]).} = trace "Calling onDisconnect for newIncomingConnection" await connection.disconnect() @@ -120,7 +120,9 @@ proc newIncomingConnection*( connection.startSending(remote) connection -proc ensureClosed(connection: Connection) {.async.} = +proc ensureClosed( + connection: Connection +) {.async: (raises: [CancelledError, QuicError]).} = ## This will automatically close the connection if there's an idle timeout reported ## by ngtcp2 discard await race(connection.quic.timeout.wait(), connection.closed.wait()) @@ -137,7 +139,7 @@ proc newOutgoingConnection*( let connection = OutgoingConnection( udp: udp, quic: quic, closed: closed, tlsBackend: Opt.some(tlsBackend) ) - proc onDisconnect() {.async.} = + proc onDisconnect() {.async: (raises: [CancelledError]).} = trace "Calling onDisconnect for newOutgoingConnection" await connection.disconnect() @@ -154,7 +156,7 @@ proc startHandshake*(connection: Connection) {.gcsafe.} = proc waitForHandshake*( connection: Connection -) {.async: (raises: [CancelledError, TimeOutError, CatchableError]).} = +) {.async: (raises: [CancelledError, QuicError, TimeOutError]).} = let key = connection.quic.error.register() defer: connection.quic.error.unregister(key) @@ -178,14 +180,22 @@ proc waitForHandshake*( errFut.cancelSoon() handshakeFut.cancelSoon() await connCloseFut - raise newException(TimeOutError, "handshake timed out") + raise newException(TimeOutError, "connection handshake timed out") elif raceFut == errFut: let connCloseFut = connection.close() timeoutFut.cancelSoon() handshakeFut.cancelSoon() await connCloseFut - let err = await errFut - raise newException(QuicError, "connection error: " & err[0]) + + let err = + try: + await errFut + except AsyncEventQueueFullError as e: + raise newException( + QuicError, "connection handshake error: waiting on error: " & e.msg + ) + + raise newException(QuicError, "connection handshake error: " & err[0]) else: errFut.cancelSoon() timeoutFut.cancelSoon() diff --git a/quic/listener.nim b/quic/listener.nim index f965cc7..6eea35d 100644 --- a/quic/listener.nim +++ b/quic/listener.nim @@ -88,15 +88,19 @@ proc newListener*( listener.udp = newDatagramTransport(onReceive, local = address) listener -proc waitForIncoming*(listener: Listener): Future[Connection] {.async.} = +proc waitForIncoming*( + listener: Listener +): Future[Connection] {.async: (raises: [CancelledError]).} = await listener.incoming.get() -proc accept*(listener: Listener): Future[Connection] {.async.} = +proc accept*( + listener: Listener +): Future[Connection] {.async: (raises: [CancelledError, QuicError, TimeOutError]).} = let conn = await listener.waitForIncoming() await conn.waitForHandshake() return conn -proc stop*(listener: Listener) {.async.} = +proc stop*(listener: Listener) {.async: (raises: [CancelledError]).} = await listener.udp.closeWait() proc destroy*(listener: Listener) = diff --git a/quic/transport/ngtcp2/connstate/disconnectingstate.nim b/quic/transport/ngtcp2/connstate/disconnectingstate.nim index 313f8de..494b6e4 100644 --- a/quic/transport/ngtcp2/connstate/disconnectingstate.nim +++ b/quic/transport/ngtcp2/connstate/disconnectingstate.nim @@ -11,7 +11,7 @@ logScope: type DisconnectingConnection* = ref object of ConnectionState connection: Opt[QuicConnection] - disconnect: Future[void] + disconnect: Future[void].Raising([CancelledError, QuicError]) ids: seq[ConnectionId] proc newDisconnectingConnection*( @@ -19,7 +19,9 @@ proc newDisconnectingConnection*( ): DisconnectingConnection = DisconnectingConnection(ids: ids, derCertificates: certificates) -proc callDisconnect(connection: QuicConnection) {.async.} = +proc callDisconnect( + connection: QuicConnection +) {.async: (raises: [CancelledError, QuicError]).} = let disconnect = connection.disconnect.valueOr: return trace "Calling disconnect proc on QuicConnection" @@ -55,21 +57,10 @@ method openStream( ): Future[Stream] {.async: (raises: [CancelledError, QuicError]).} = raise newException(ClosedConnectionError, "connection is disconnecting") -template handleWithQuicError*(body: untyped) = - try: - body - except CancelledError as e: - raise e - except QuicError as e: - raise e - except CatchableError as e: - raise newException(QuicError, e.msg) - method close( state: DisconnectingConnection ) {.async: (raises: [CancelledError, QuicError]).} = - handleWithQuicError: - await state.disconnect + await state.disconnect let connection = state.connection.valueOr: return connection.switch(newClosedConnection(state.derCertificates)) @@ -78,8 +69,7 @@ method drop( state: DisconnectingConnection ) {.async: (raises: [CancelledError, QuicError]).} = trace "Drop DisconnectingConnection state" - handleWithQuicError: - await state.disconnect + await state.disconnect let connection = state.connection.valueOr: return connection.switch(newClosedConnection(state.derCertificates)) diff --git a/quic/transport/quicconnection.nim b/quic/transport/quicconnection.nim index 751f5df..674ef6e 100644 --- a/quic/transport/quicconnection.nim +++ b/quic/transport/quicconnection.nim @@ -14,7 +14,7 @@ type handshake*: AsyncEvent timeout*: AsyncEvent error*: AsyncEventQueue[string] - disconnect*: Opt[proc(): Future[void] {.gcsafe, raises: [].}] + disconnect*: Opt[proc(): Future[void] {.gcsafe, async: (raises: [CancelledError]).}] onNewId*: IdCallback onRemoveId*: IdCallback diff --git a/quic/transport/stream.nim b/quic/transport/stream.nim index 46719b8..eef6533 100644 --- a/quic/transport/stream.nim +++ b/quic/transport/stream.nim @@ -76,25 +76,32 @@ proc switch*(stream: Stream, newState: StreamState) {.raises: [QuicError].} = proc id*(stream: Stream): int64 = stream.id -proc read*(stream: Stream): Future[seq[byte]] {.async.} = +proc read*( + stream: Stream +): Future[seq[byte]] {.async: (raises: [CancelledError, QuicError]).} = result = await stream.state.read() -proc write*(stream: Stream, bytes: seq[byte]) {.async.} = +proc write*( + stream: Stream, bytes: seq[byte] +) {.async: (raises: [CancelledError, QuicError]).} = # Writing has to be serialized on the same stream as otherwise # data might not be sent correctly. await stream.lock.acquire() defer: - stream.lock.release() + try: + stream.lock.release() + except AsyncLockError: + discard # should not happen - lock acquired directly above await stream.state.write(bytes) -proc close*(stream: Stream) {.async.} = +proc close*(stream: Stream) {.async: (raises: [CancelledError, QuicError]).} = await stream.state.close() -proc closeWrite*(stream: Stream) {.async.} = +proc closeWrite*(stream: Stream) {.async: (raises: [CancelledError, QuicError]).} = await stream.state.closeWrite() -proc closeRead*(stream: Stream) {.async.} = +proc closeRead*(stream: Stream) {.async: (raises: [CancelledError, QuicError]).} = await stream.state.closeRead() proc reset*(stream: Stream) =