diff --git a/quic/transport/ngtcp2/native/streams.nim b/quic/transport/ngtcp2/native/streams.nim index 4fe904e..fe8e240 100644 --- a/quic/transport/ngtcp2/native/streams.nim +++ b/quic/transport/ngtcp2/native/streams.nim @@ -62,10 +62,9 @@ proc onStreamReset( stream_user_data: pointer, ): cint {.cdecl.} = trace "onStreamReset" - let stream = cast[Stream](stream_user_data) - if stream != nil: - stream.onClose() - return 0 + let state = cast[StreamState](stream_user_data) + if state != nil: + state.reset() proc onStreamStopSending( conn: ptr ngtcp2_conn, diff --git a/quic/transport/ngtcp2/stream/closedstate.nim b/quic/transport/ngtcp2/stream/closedstate.nim index ddd1f66..d9bf511 100644 --- a/quic/transport/ngtcp2/stream/closedstate.nim +++ b/quic/transport/ngtcp2/stream/closedstate.nim @@ -48,7 +48,7 @@ method isClosed*(state: ClosedStream): bool = method receive*(state: ClosedStream, offset: uint64, bytes: seq[byte], isFin: bool) = discard -method reset*(state: ClosedStream) {.async.} = +method reset*(state: ClosedStream) = discard method expire*(state: ClosedStream) {.raises: [].} = diff --git a/quic/transport/ngtcp2/stream/openstate.nim b/quic/transport/ngtcp2/stream/openstate.nim index a9cd07c..69191c4 100644 --- a/quic/transport/ngtcp2/stream/openstate.nim +++ b/quic/transport/ngtcp2/stream/openstate.nim @@ -31,9 +31,9 @@ method enter*(state: OpenStream, stream: Stream) = setUserData(state.stream, state.connection, unsafeAddr state[]) method leave*(state: OpenStream) = + setUserData(state.stream, state.connection, nil) procCall leave(StreamState(state)) state.stream = Opt.none(Stream) - # TODO: clear userdata method read*(state: OpenStream): Future[seq[byte]] {.async.} = let incomingFut = state.incoming.get() @@ -63,9 +63,10 @@ method close*(state: OpenStream) {.async.} = discard state.connection.send(state.stream.get.id, @[], true) stream.switch(newClosedStream(state.incoming, state.frameSorter)) -method reset*(state: OpenStream) {.async.} = +method reset*(state: OpenStream) = let stream = state.stream.valueOr: return + state.closeFut.complete("stream reset") state.connection.shutdownStream(stream.id) stream.closed.fire() diff --git a/quic/transport/stream.nim b/quic/transport/stream.nim index 66b6997..24f669b 100644 --- a/quic/transport/stream.nim +++ b/quic/transport/stream.nim @@ -30,7 +30,7 @@ method write*(state: StreamState, bytes: seq[byte]) {.base, async.} = method close*(state: StreamState) {.base, async.} = doAssert false, "override this method" -method reset*(state: StreamState) {.base, async.} = +method reset*(state: StreamState) {.base.} = doAssert false, "override this method" method onClose*(state: StreamState) {.base.} = @@ -71,8 +71,8 @@ proc write*(stream: Stream, bytes: seq[byte]) {.async.} = proc close*(stream: Stream) {.async.} = await stream.state.close() -proc reset*(stream: Stream) {.async.} = - await stream.state.reset() +proc reset*(stream: Stream) = + stream.state.reset() proc onClose*(stream: Stream) = stream.state.onClose() diff --git a/tests/quic/testStreams.nim b/tests/quic/testStreams.nim index db71416..6d2a5c3 100644 --- a/tests/quic/testStreams.nim +++ b/tests/quic/testStreams.nim @@ -49,7 +49,7 @@ suite "streams": asyncTest "raises when reading from or writing to reset stream": let stream = await client.openStream() - await stream.reset() + stream.reset() expect QuicError: discard await stream.read()