mirror of
https://github.com/vacp2p/nim-quic.git
synced 2026-01-09 22:08:09 -05:00
fix: close should clean stream userdata (#71)
This commit is contained in:
@@ -50,7 +50,8 @@ proc onReceiveStreamData(
|
||||
var bytes = newSeqUninitialized[byte](datalen)
|
||||
copyMem(bytes.toUnsafePtr, data, datalen)
|
||||
let isFin = (flags and NGTCP2_STREAM_DATA_FLAG_FIN) != 0
|
||||
state.receive(uint64(offset), bytes, isFin)
|
||||
if state != nil:
|
||||
state.receive(uint64(offset), bytes, isFin)
|
||||
|
||||
proc onStreamReset(
|
||||
connection: ptr ngtcp2_conn,
|
||||
|
||||
@@ -30,9 +30,10 @@ proc newClosedStream*(
|
||||
connection: connection,
|
||||
)
|
||||
|
||||
method enter(state: ClosedStream, stream: Stream) =
|
||||
method enter*(state: ClosedStream, stream: Stream) =
|
||||
procCall enter(StreamState(state), stream)
|
||||
state.stream = Opt.some(stream)
|
||||
setUserData(state.stream, state.connection, nil)
|
||||
|
||||
proc clearUserData*(state: ClosedStream) =
|
||||
try:
|
||||
@@ -40,10 +41,10 @@ proc clearUserData*(state: ClosedStream) =
|
||||
except Ngtcp2Error:
|
||||
discard # stream already closed
|
||||
|
||||
method leave(state: ClosedStream) =
|
||||
method leave*(state: ClosedStream) =
|
||||
state.stream = Opt.none(Stream)
|
||||
|
||||
method read(state: ClosedStream): Future[seq[byte]] {.async.} =
|
||||
method read*(state: ClosedStream): Future[seq[byte]] {.async.} =
|
||||
if not state.frameSorter.isComplete():
|
||||
let incomingFut = state.remaining.get()
|
||||
if (await race(incomingFut, state.cancelRead)) == incomingFut:
|
||||
@@ -61,28 +62,25 @@ method read(state: ClosedStream): Future[seq[byte]] {.async.} =
|
||||
|
||||
raise newException(StreamError, "stream is closed")
|
||||
|
||||
method write(state: ClosedStream, bytes: seq[byte]) {.async.} =
|
||||
method write*(state: ClosedStream, bytes: seq[byte]) {.async.} =
|
||||
trace "cant write, stream is closed"
|
||||
raise newException(ClosedStreamError, "stream is closed")
|
||||
|
||||
method close(state: ClosedStream) {.async.} =
|
||||
method close*(state: ClosedStream) {.async.} =
|
||||
discard
|
||||
|
||||
method onClose(state: ClosedStream) =
|
||||
method onClose*(state: ClosedStream) =
|
||||
discard
|
||||
|
||||
method isClosed(state: ClosedStream): bool =
|
||||
method isClosed*(state: ClosedStream): bool =
|
||||
true
|
||||
|
||||
method receive(state: ClosedStream, offset: uint64, bytes: seq[byte], isFin: bool) =
|
||||
method receive*(state: ClosedStream, offset: uint64, bytes: seq[byte], isFin: bool) =
|
||||
if state.frameSorter.isComplete():
|
||||
return
|
||||
|
||||
state.frameSorter.insert(offset, bytes, isFin)
|
||||
|
||||
if state.frameSorter.isComplete():
|
||||
state.clearUserData()
|
||||
|
||||
if state.stream.isSome:
|
||||
let stream = state.stream.get()
|
||||
stream.closed.fire()
|
||||
|
||||
@@ -2,7 +2,7 @@ import ../../../basics
|
||||
import ../../framesorter
|
||||
import ../../stream
|
||||
import ./helpers
|
||||
import ../native/connection
|
||||
import ../native/[connection, errors]
|
||||
import ./closedstate
|
||||
import chronicles
|
||||
|
||||
@@ -27,11 +27,11 @@ method enter*(state: OpenStream, stream: Stream) =
|
||||
state.stream = Opt.some(stream)
|
||||
setUserData(state.stream, state.connection, unsafeAddr state[])
|
||||
|
||||
method leave(state: OpenStream) =
|
||||
method leave*(state: OpenStream) =
|
||||
procCall leave(StreamState(state))
|
||||
state.stream = Opt.none(Stream)
|
||||
|
||||
method read(state: OpenStream): Future[seq[byte]] {.async.} =
|
||||
method read*(state: OpenStream): Future[seq[byte]] {.async.} =
|
||||
let incomingFut = state.incoming.get()
|
||||
if (await race(incomingFut, state.cancelRead)) == incomingFut:
|
||||
result = await incomingFut
|
||||
@@ -46,19 +46,19 @@ method read(state: OpenStream): Future[seq[byte]] {.async.} =
|
||||
)
|
||||
raise newException(StreamError, "stream is closed")
|
||||
|
||||
method write(state: OpenStream, bytes: seq[byte]): Future[void] =
|
||||
method write*(state: OpenStream, bytes: seq[byte]): Future[void] =
|
||||
# let stream = state.stream.valueOr:
|
||||
# raise newException(QuicError, "stream is closed")
|
||||
# See https://github.com/status-im/nim-quic/pull/41 for more details
|
||||
state.connection.send(state.stream.get.id, bytes)
|
||||
|
||||
method close(state: OpenStream) {.async.} =
|
||||
method close*(state: OpenStream) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
discard state.connection.send(state.stream.get.id, @[], true)
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter, state.connection))
|
||||
|
||||
method reset(state: OpenStream) {.async.} =
|
||||
method reset*(state: OpenStream) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
state.cancelRead.cancelSoon()
|
||||
@@ -75,7 +75,7 @@ method onClose*(state: OpenStream) =
|
||||
method isClosed*(state: OpenStream): bool =
|
||||
false
|
||||
|
||||
method receive(state: OpenStream, offset: uint64, bytes: seq[byte], isFin: bool) =
|
||||
method receive*(state: OpenStream, offset: uint64, bytes: seq[byte], isFin: bool) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
|
||||
Reference in New Issue
Block a user