fix: closing stream on stream reset (#79)

This commit is contained in:
richΛrd
2025-05-26 09:54:09 -04:00
committed by GitHub
parent a10c6f0d89
commit c11c736558
5 changed files with 11 additions and 11 deletions

View File

@@ -62,10 +62,9 @@ proc onStreamReset(
stream_user_data: pointer,
): cint {.cdecl.} =
trace "onStreamReset"
let stream = cast[Stream](stream_user_data)
if stream != nil:
stream.onClose()
return 0
let state = cast[StreamState](stream_user_data)
if state != nil:
state.reset()
proc onStreamStopSending(
conn: ptr ngtcp2_conn,

View File

@@ -48,7 +48,7 @@ method isClosed*(state: ClosedStream): bool =
method receive*(state: ClosedStream, offset: uint64, bytes: seq[byte], isFin: bool) =
discard
method reset*(state: ClosedStream) {.async.} =
method reset*(state: ClosedStream) =
discard
method expire*(state: ClosedStream) {.raises: [].} =

View File

@@ -31,9 +31,9 @@ method enter*(state: OpenStream, stream: Stream) =
setUserData(state.stream, state.connection, unsafeAddr state[])
method leave*(state: OpenStream) =
setUserData(state.stream, state.connection, nil)
procCall leave(StreamState(state))
state.stream = Opt.none(Stream)
# TODO: clear userdata
method read*(state: OpenStream): Future[seq[byte]] {.async.} =
let incomingFut = state.incoming.get()
@@ -63,9 +63,10 @@ method close*(state: OpenStream) {.async.} =
discard state.connection.send(state.stream.get.id, @[], true)
stream.switch(newClosedStream(state.incoming, state.frameSorter))
method reset*(state: OpenStream) {.async.} =
method reset*(state: OpenStream) =
let stream = state.stream.valueOr:
return
state.closeFut.complete("stream reset")
state.connection.shutdownStream(stream.id)
stream.closed.fire()

View File

@@ -30,7 +30,7 @@ method write*(state: StreamState, bytes: seq[byte]) {.base, async.} =
method close*(state: StreamState) {.base, async.} =
doAssert false, "override this method"
method reset*(state: StreamState) {.base, async.} =
method reset*(state: StreamState) {.base.} =
doAssert false, "override this method"
method onClose*(state: StreamState) {.base.} =
@@ -71,8 +71,8 @@ proc write*(stream: Stream, bytes: seq[byte]) {.async.} =
proc close*(stream: Stream) {.async.} =
await stream.state.close()
proc reset*(stream: Stream) {.async.} =
await stream.state.reset()
proc reset*(stream: Stream) =
stream.state.reset()
proc onClose*(stream: Stream) =
stream.state.onClose()