refactor(streamstate): list raised errors (#133)

This commit is contained in:
vladopajic
2025-10-03 23:00:38 +02:00
committed by GitHub
parent 6aba324b67
commit 7f687e8c63
14 changed files with 122 additions and 76 deletions

View File

@@ -202,34 +202,25 @@ proc localAddress*(
connection.udp.localAddress()
proc handleNewStream(
connection: Connection, streamFut: Future[Stream]
connection: Connection,
streamFut: Future[Stream].Raising([CancelledError, QuicError]),
): Future[Stream] {.async: (raises: [CancelledError, QuicError]).} =
let closedFut = connection.closed.wait()
let raceFut = await race(streamFut, closedFut)
if raceFut == closedFut:
raise newException(QuicError, "connection closed")
# Note: this try will not be needed once quic.openStream() and
# quic.incomingStream() methods list all exceptions. Even now this is not needed
# but it is here to make compiler happy and to avoid throwing CatchableError.
try:
return await streamFut
except CancelledError as e:
raise e
except QuicError as e:
raise e
except CatchableError as e:
raise newException(QuicError, "opening stream: " & $e.msg)
proc openStream*(
connection: Connection, unidirectional = false
): Future[Stream] {.async: (raises: [CancelledError, QuicError]).} =
return await connection.handleNewStream(connection.quic.openStream(unidirectional))
return await streamFut
proc incomingStream*(
connection: Connection
): Future[Stream] {.async: (raises: [CancelledError, QuicError]).} =
return await connection.handleNewStream(connection.quic.incomingStream())
proc openStream*(
connection: Connection, unidirectional = false
): Future[Stream] {.async: (raises: [CancelledError, QuicError]).} =
return await connection.handleNewStream(connection.quic.openStream(unidirectional))
proc certificates*(connection: Connection): seq[seq[byte]] =
connection.quic.certificates()

View File

@@ -26,7 +26,7 @@ method receive(state: ClosedConnection, datagram: sink Datagram) =
method openStream(
state: ClosedConnection, unidirectional: bool
): Future[Stream] {.async.} =
): Future[Stream] {.async: (raises: [CancelledError, QuicError]).} =
raise newException(ClosedConnectionError, "connection is closed")
method close(state: ClosedConnection) {.async.} =

View File

@@ -50,7 +50,7 @@ method receive(state: DisconnectingConnection, datagram: sink Datagram) =
method openStream(
state: DisconnectingConnection, unidirectional: bool
): Future[Stream] {.async.} =
): Future[Stream] {.async: (raises: [CancelledError, QuicError]).} =
raise newException(ClosedConnectionError, "connection is disconnecting")
method close(state: DisconnectingConnection) {.async.} =

View File

@@ -61,7 +61,7 @@ method receive(state: DrainingConnection, datagram: sink Datagram) =
method openStream(
state: DrainingConnection, unidirectional: bool
): Future[Stream] {.async.} =
): Future[Stream] {.async: (raises: [CancelledError, QuicError]).} =
raise newException(ClosedConnectionError, "connection is closing")
method close(state: DrainingConnection) {.async.} =

View File

@@ -135,7 +135,7 @@ method receive(state: OpenConnection, datagram: sink Datagram) =
method openStream(
state: OpenConnection, unidirectional: bool
): Future[Stream] {.async.} =
): Future[Stream] {.async: (raises: [CancelledError, QuicError]).} =
let quicConnection = state.quicConnection.valueOr:
raise newException(QuicError, "connection is closed")
await quicConnection.handshake.wait()

View File

@@ -137,7 +137,7 @@ proc trySend(
let ecn = ECN(packetInfo.ecn)
Datagram(data: buffer, ecn: ecn)
proc send*(connection: Ngtcp2Connection) =
proc send*(connection: Ngtcp2Connection) {.raises: [QuicError].} =
var done = false
while not done:
let datagram = connection.trySend()
@@ -153,7 +153,7 @@ proc send(
messagePtr: ptr byte,
messageLen: uint,
isFin: bool = false,
): Future[int] {.async.} =
): Future[int] {.async: (raises: [CancelledError, QuicError]).} =
let written = addr result
var datagram = trySend(connection, streamId, messagePtr, messageLen, written, isFin)
@@ -174,7 +174,7 @@ proc send(
proc send*(
connection: Ngtcp2Connection, streamId: int64, bytes: seq[byte], isFin: bool = false
) {.async.} =
) {.async: (raises: [CancelledError, QuicError]).} =
var messagePtr = bytes.toUnsafePtr
var messageLen = bytes.len.uint
var done = false

View File

@@ -1,5 +1,6 @@
import ngtcp2
import ../../../helpers/[openarray, sequninit]
import ../../../errors
import ../../stream
import ../streamstate/openstate
import ./connection
@@ -8,7 +9,9 @@ import chronicles
proc newStream(connection: Ngtcp2Connection, id: int64): Stream =
newStream(id, newOpenStreamState(connection))
proc openStream*(connection: Ngtcp2Connection, unidirectional: bool): Stream =
proc openStream*(
connection: Ngtcp2Connection, unidirectional: bool
): Stream {.raises: [QuicError].} =
var id: int64
if unidirectional:
id = connection.openUniStream()

View File

@@ -14,12 +14,16 @@ method expire*(state: BaseStreamState) {.raises: [].} =
return
stream.closed.fire()
method write*(state: BaseStreamState, bytes: seq[byte]) {.async.} =
method write*(
state: BaseStreamState, bytes: seq[byte]
) {.async: (raises: [CancelledError, QuicError]).} =
let stream = state.stream.valueOr:
return
await state.connection.send(stream.id, bytes)
proc setUserData*(state: BaseStreamState, stream: stream.Stream) =
proc setUserData*(
state: BaseStreamState, stream: stream.Stream
) {.raises: [QuicError].} =
state.connection.setStreamUserData(stream.id, unsafeAddr state[])
proc allowMoreIncomingBytes*(state: BaseStreamState, amount: uint64) =
@@ -33,10 +37,10 @@ proc sendFin*(state: BaseStreamState, stream: stream.Stream) =
state.finSent = true
discard state.connection.send(stream.id, @[], true)
proc reset*(state: BaseStreamState, stream: stream.Stream) =
proc reset*(state: BaseStreamState, stream: stream.Stream) {.raises: [QuicError].} =
state.connection.shutdownStream(stream.id)
proc switch*(state: BaseStreamState, newStream: StreamState) =
proc switch*(state: BaseStreamState, newStream: StreamState) {.raises: [QuicError].} =
let stream = state.stream.valueOr:
return
stream.switch(newStream)

View File

@@ -17,7 +17,7 @@ proc newClosedStreamState*(
wasReset: wasReset,
)
method enter*(state: ClosedStreamState, stream: Stream) =
method enter*(state: ClosedStreamState, stream: Stream) {.raises: [QuicError].} =
procCall enter(StreamState(state), stream)
state.stream = Opt.some(stream)
state.setUserData(stream)
@@ -33,7 +33,9 @@ method enter*(state: ClosedStreamState, stream: Stream) =
method leave*(state: ClosedStreamState) =
doAssert false, "ClosedStreamState state should never leave"
method read*(state: ClosedStreamState): Future[seq[byte]] {.async.} =
method read*(
state: ClosedStreamState
): Future[seq[byte]] {.async: (raises: [CancelledError, QuicError]).} =
# If stream was reset, always throw exception
if state.wasReset:
raise newException(ClosedStreamError, "stream was reset")
@@ -46,16 +48,24 @@ method read*(state: ClosedStreamState): Future[seq[byte]] {.async.} =
# When no more data is available, return EOF instead of throwing exception
return @[]
method write*(state: ClosedStreamState, bytes: seq[byte]) {.async.} =
method write*(
state: ClosedStreamState, bytes: seq[byte]
) {.async: (raises: [CancelledError, QuicError]).} =
raise newException(ClosedStreamError, "stream is closed")
method close*(state: ClosedStreamState) {.async.} =
method close*(
state: ClosedStreamState
) {.async: (raises: [CancelledError, QuicError]).} =
discard
method closeWrite*(state: ClosedStreamState) {.async.} =
method closeWrite*(
state: ClosedStreamState
) {.async: (raises: [CancelledError, QuicError]).} =
discard
method closeRead*(state: ClosedStreamState) {.async.} =
method closeRead*(
state: ClosedStreamState
) {.async: (raises: [CancelledError, QuicError]).} =
discard
method onClose*(state: ClosedStreamState) =
@@ -69,5 +79,5 @@ method receive*(
) =
discard
method reset*(state: ClosedStreamState) =
method reset*(state: ClosedStreamState) {.raises: [QuicError].} =
discard

View File

@@ -21,7 +21,9 @@ method leave*(state: OpenStreamState) =
procCall leave(StreamState(state))
state.stream = Opt.none(Stream)
method read*(state: OpenStreamState): Future[seq[byte]] {.async.} =
method read*(
state: OpenStreamState
): Future[seq[byte]] {.async: (raises: [CancelledError, QuicError]).} =
# Check for immediate EOF conditions
if state.queue.isEOF() and state.queue.incoming.len == 0:
return @[] # Return EOF immediately per RFC 9000 "Data Read" state
@@ -40,16 +42,22 @@ method read*(state: OpenStreamState): Future[seq[byte]] {.async.} =
# Empty data but no EOF; continue reading for more data
return await state.read()
method write*(state: OpenStreamState, bytes: seq[byte]) {.async.} =
method write*(
state: OpenStreamState, bytes: seq[byte]
) {.async: (raises: [CancelledError, QuicError]).} =
await procCall BaseStreamState(state).write(bytes)
method close*(state: OpenStreamState) {.async.} =
method close*(state: OpenStreamState) {.async: (raises: [CancelledError, QuicError]).} =
state.switch(newReceiveStreamState(state))
method closeWrite*(state: OpenStreamState) {.async.} =
method closeWrite*(
state: OpenStreamState
) {.async: (raises: [CancelledError, QuicError]).} =
state.switch(newReceiveStreamState(state))
method closeRead*(state: OpenStreamState) {.async.} =
method closeRead*(
state: OpenStreamState
) {.async: (raises: [CancelledError, QuicError]).} =
state.switch(newSendStreamState(state))
method onClose*(state: OpenStreamState) =
@@ -61,5 +69,5 @@ method isClosed*(state: OpenStreamState): bool =
method receive*(state: OpenStreamState, offset: uint64, bytes: seq[byte], isFin: bool) =
state.queue.insert(offset, bytes, isFin)
method reset*(state: OpenStreamState) =
method reset*(state: OpenStreamState) {.raises: [QuicError].} =
state.switch(newClosedStreamState(state, wasReset = true))

View File

@@ -22,7 +22,9 @@ method leave*(state: ReceiveStreamState) =
procCall leave(StreamState(state))
state.stream = Opt.none(Stream)
method read*(state: ReceiveStreamState): Future[seq[byte]] {.async.} =
method read*(
state: ReceiveStreamState
): Future[seq[byte]] {.async: (raises: [CancelledError, QuicError]).} =
# Check for immediate EOF conditions
if state.queue.isEOF() and state.queue.incoming.len == 0:
state.switch(newClosedStreamState(state))
@@ -43,16 +45,24 @@ method read*(state: ReceiveStreamState): Future[seq[byte]] {.async.} =
# Empty data but no EOF; continue reading for more data
return await state.read()
method write*(state: ReceiveStreamState, bytes: seq[byte]) {.async.} =
method write*(
state: ReceiveStreamState, bytes: seq[byte]
) {.async: (raises: [CancelledError, QuicError]).} =
raise newException(ClosedStreamError, "write side is closed")
method close*(state: ReceiveStreamState) {.async.} =
method close*(
state: ReceiveStreamState
) {.async: (raises: [CancelledError, QuicError]).} =
state.switch(newClosedStreamState(state))
method closeWrite*(state: ReceiveStreamState) {.async.} =
method closeWrite*(
state: ReceiveStreamState
) {.async: (raises: [CancelledError, QuicError]).} =
discard
method closeRead*(state: ReceiveStreamState) {.async.} =
method closeRead*(
state: ReceiveStreamState
) {.async: (raises: [CancelledError, QuicError]).} =
state.switch(newClosedStreamState(state))
method onClose*(state: ReceiveStreamState) =
@@ -68,5 +78,5 @@ method receive*(
if state.queue.isEOF():
state.switch(newClosedStreamState(state))
method reset*(state: ReceiveStreamState) =
method reset*(state: ReceiveStreamState) {.raises: [QuicError].} =
state.switch(newClosedStreamState(state, wasReset = true))

View File

@@ -10,7 +10,7 @@ type SendStreamState* = ref object of BaseStreamState
proc newSendStreamState*(base: BaseStreamState): SendStreamState =
SendStreamState(connection: base.connection, queue: base.queue, finSent: base.finSent)
method enter*(state: SendStreamState, stream: Stream) =
method enter*(state: SendStreamState, stream: Stream) {.raises: [QuicError].} =
procCall enter(StreamState(state), stream)
state.stream = Opt.some(stream)
state.setUserData(stream)
@@ -20,19 +20,27 @@ method leave*(state: SendStreamState) =
procCall leave(StreamState(state))
state.stream = Opt.none(Stream)
method read*(state: SendStreamState): Future[seq[byte]] {.async.} =
method read*(
state: SendStreamState
): Future[seq[byte]] {.async: (raises: [CancelledError, QuicError]).} =
raise newException(ClosedStreamError, "read side is closed")
method write*(state: SendStreamState, bytes: seq[byte]) {.async.} =
method write*(
state: SendStreamState, bytes: seq[byte]
) {.async: (raises: [CancelledError, QuicError]).} =
await procCall BaseStreamState(state).write(bytes)
method close*(state: SendStreamState) {.async.} =
method close*(state: SendStreamState) {.async: (raises: [CancelledError, QuicError]).} =
state.switch(newClosedStreamState(state))
method closeWrite*(state: SendStreamState) {.async.} =
method closeWrite*(
state: SendStreamState
) {.async: (raises: [CancelledError, QuicError]).} =
state.switch(newClosedStreamState(state))
method closeRead*(stream: SendStreamState) {.async.} =
method closeRead*(
stream: SendStreamState
) {.async: (raises: [CancelledError, QuicError]).} =
discard
method onClose*(state: SendStreamState) =
@@ -44,5 +52,5 @@ method isClosed*(state: SendStreamState): bool =
method receive*(state: SendStreamState, offset: uint64, bytes: seq[byte], isFin: bool) =
discard
method reset*(state: SendStreamState) =
method reset*(state: SendStreamState) {.raises: [QuicError].} =
state.switch(newClosedStreamState(state, wasReset = true))

View File

@@ -43,7 +43,9 @@ method send*(state: ConnectionState) =
method receive*(state: ConnectionState, datagram: sink Datagram) =
doAssert false # override this method
method openStream*(state: ConnectionState, unidirectional: bool): Future[Stream] =
method openStream*(
state: ConnectionState, unidirectional: bool
): Future[Stream] {.async: (raises: [CancelledError, QuicError]).} =
doAssert false # override this method
method drop*(state: ConnectionState): Future[void] {.gcsafe.} =
@@ -85,11 +87,15 @@ proc send*(connection: QuicConnection) =
proc receive*(connection: QuicConnection, datagram: sink Datagram) =
connection.state.receive(datagram)
proc openStream*(connection: QuicConnection, unidirectional = false): Future[Stream] =
connection.state.openStream(unidirectional = unidirectional)
proc openStream*(
connection: QuicConnection, unidirectional = false
): Future[Stream] {.async: (raises: [CancelledError, QuicError]).} =
await connection.state.openStream(unidirectional = unidirectional)
proc incomingStream*(connection: QuicConnection): Future[Stream] =
connection.incoming.get()
proc incomingStream*(
connection: QuicConnection
): Future[Stream] {.async: (raises: [CancelledError, QuicError]).} =
await connection.incoming.get()
proc close*(connection: QuicConnection): Future[void] =
connection.state.close()

View File

@@ -13,34 +13,42 @@ type
StreamError* = object of QuicError
{.push raises: [QuicError].}
method enter*(state: StreamState, stream: Stream) {.base.} =
method enter*(state: StreamState, stream: Stream) {.base, raises: [QuicError].} =
doAssert not state.entered # states are not reentrant
state.entered = true
method leave*(state: StreamState) {.base.} =
method leave*(state: StreamState) {.base, raises: [QuicError].} =
discard
method read*(state: StreamState): Future[seq[byte]] {.base, async.} =
method read*(
state: StreamState
): Future[seq[byte]] {.base, async: (raises: [CancelledError, QuicError]).} =
doAssert false, "override this method"
method write*(state: StreamState, bytes: seq[byte]) {.base, async.} =
method write*(
state: StreamState, bytes: seq[byte]
) {.base, async: (raises: [CancelledError, QuicError]).} =
doAssert false, "override this method"
method close*(state: StreamState) {.base, async.} =
method close*(
state: StreamState
) {.base, async: (raises: [CancelledError, QuicError]).} =
doAssert false, "override this method"
method closeWrite*(state: StreamState) {.base, async.} =
method closeWrite*(
state: StreamState
) {.base, async: (raises: [CancelledError, QuicError]).} =
doAssert false, "override this method"
method closeRead*(state: StreamState) {.base, async.} =
method closeRead*(
state: StreamState
) {.base, async: (raises: [CancelledError, QuicError]).} =
doAssert false, "override this method"
method reset*(state: StreamState) {.base.} =
method reset*(state: StreamState) {.base, raises: [QuicError].} =
doAssert false, "override this method"
method onClose*(state: StreamState) {.base.} =
method onClose*(state: StreamState) {.base, raises: [QuicError].} =
doAssert false, "override this method"
method isClosed*(state: StreamState): bool {.base, raises: [].} =
@@ -48,21 +56,19 @@ method isClosed*(state: StreamState): bool {.base, raises: [].} =
method receive*(
state: StreamState, offset: uint64, bytes: seq[byte], isFin: bool
) {.base.} =
) {.base, raises: [QuicError].} =
doAssert false, "override this method"
{.pop.}
method expire*(state: StreamState) {.base, raises: [].} =
doAssert false, "override this method"
proc newStream*(id: int64, state: StreamState): Stream =
proc newStream*(id: int64, state: StreamState): Stream {.raises: [QuicError].} =
let stream =
Stream(state: state, id: id, closed: newAsyncEvent(), lock: newAsyncLock())
state.enter(stream)
stream
proc switch*(stream: Stream, newState: StreamState) =
proc switch*(stream: Stream, newState: StreamState) {.raises: [QuicError].} =
stream.state.leave()
stream.state = newState
stream.state.enter(stream)