chore(stream): set stream data only once (#151)

This commit is contained in:
vladopajic
2025-10-16 16:06:13 +02:00
committed by GitHub
parent b7cc500fad
commit 49609765c8
7 changed files with 34 additions and 24 deletions

View File

@@ -6,8 +6,13 @@ import ../streamstate/openstate
import ./connection
import chronicles
logScope:
topics = "native stream"
proc newStream(connection: Ngtcp2Connection, id: int64): Stream =
newStream(id, newOpenStreamState(connection))
let stream = newStream(id, newOpenStreamState(connection))
connection.setStreamUserData(id, unsafeAddr stream[])
return stream
proc openStream*(
connection: Ngtcp2Connection, unidirectional: bool
@@ -34,9 +39,12 @@ proc onStreamClose(
stream_user_data: pointer,
): cint {.cdecl.} =
trace "onStreamClose"
let state = cast[StreamState](stream_user_data)
if state != nil:
state.onClose()
let stream = cast[Stream](stream_user_data)
if stream != nil:
try:
stream.onClose()
except QuicError as e:
error "Unexpect error onStreamClose", msg = e.msg
proc onReceiveStreamData(
connection: ptr ngtcp2_conn,
@@ -49,12 +57,15 @@ proc onReceiveStreamData(
stream_user_data: pointer,
): cint {.cdecl.} =
trace "onReceiveStreamData"
let state = cast[StreamState](stream_user_data)
var bytes = newSeqUninit[byte](datalen)
copyMem(bytes.toUnsafePtr, data, datalen)
let isFin = (flags and NGTCP2_STREAM_DATA_FLAG_FIN) != 0
if state != nil:
state.receive(uint64(offset), bytes, isFin)
let stream = cast[Stream](stream_user_data)
if stream != nil:
var bytes = newSeqUninit[byte](datalen)
copyMem(bytes.toUnsafePtr, data, datalen)
let isFin = (flags and NGTCP2_STREAM_DATA_FLAG_FIN) != 0
try:
stream.onReceive(uint64(offset), bytes, isFin)
except QuicError as e:
error "Unexpect error onReceiveStreamData", msg = e.msg
proc onStreamReset(
connection: ptr ngtcp2_conn,
@@ -65,9 +76,12 @@ proc onStreamReset(
stream_user_data: pointer,
): cint {.cdecl.} =
trace "onStreamReset"
let state = cast[StreamState](stream_user_data)
if state != nil:
state.reset()
let stream = cast[Stream](stream_user_data)
if stream != nil:
try:
stream.reset()
except QuicError as e:
error "Unexpect error onStreamReset", msg = e.msg
proc onStreamStopSending(
conn: ptr ngtcp2_conn,

View File

@@ -21,11 +21,6 @@ method write*(
return
await state.connection.send(stream.id, bytes)
proc setUserData*(
state: BaseStreamState, stream: stream.Stream
) {.raises: [QuicError].} =
state.connection.setStreamUserData(stream.id, unsafeAddr state[])
proc allowMoreIncomingBytes*(state: BaseStreamState, amount: uint64) =
let stream = state.stream.valueOr:
return

View File

@@ -20,7 +20,6 @@ proc newClosedStreamState*(
method enter*(state: ClosedStreamState, stream: Stream) {.raises: [QuicError].} =
procCall enter(StreamState(state), stream)
state.stream = Opt.some(stream)
state.setUserData(stream)
if state.wasReset:
state.queue.reset()
state.queue.close()

View File

@@ -15,7 +15,6 @@ proc newOpenStreamState*(connection: Ngtcp2Connection): OpenStreamState =
method enter*(state: OpenStreamState, stream: Stream) =
procCall enter(StreamState(state), stream)
state.stream = Opt.some(stream)
state.setUserData(stream)
method leave*(state: OpenStreamState) =
procCall leave(StreamState(state))

View File

@@ -15,7 +15,6 @@ proc newReceiveStreamState*(base: BaseStreamState): ReceiveStreamState =
method enter*(state: ReceiveStreamState, stream: Stream) =
procCall enter(StreamState(state), stream)
state.stream = Opt.some(stream)
state.setUserData(stream)
state.sendFin(stream)
method leave*(state: ReceiveStreamState) =

View File

@@ -13,7 +13,6 @@ proc newSendStreamState*(base: BaseStreamState): SendStreamState =
method enter*(state: SendStreamState, stream: Stream) {.raises: [QuicError].} =
procCall enter(StreamState(state), stream)
state.stream = Opt.some(stream)
state.setUserData(stream)
state.queue.close()
method leave*(state: SendStreamState) =

View File

@@ -104,10 +104,10 @@ proc closeWrite*(stream: Stream) {.async: (raises: [CancelledError, QuicError]).
proc closeRead*(stream: Stream) {.async: (raises: [CancelledError, QuicError]).} =
await stream.state.closeRead()
proc reset*(stream: Stream) =
proc reset*(stream: Stream) {.raises: [QuicError].} =
stream.state.reset()
proc onClose*(stream: Stream) =
proc onClose*(stream: Stream) {.raises: [QuicError].} =
stream.state.onClose()
proc isClosed*(stream: Stream): bool =
@@ -118,3 +118,8 @@ proc isUnidirectional*(stream: Stream): bool =
proc expire*(stream: Stream) {.raises: [].} =
stream.state.expire()
proc onReceive*(
stream: Stream, offset: uint64, bytes: seq[byte], isFin: bool
) {.raises: [QuicError].} =
stream.state.receive(offset, bytes, isFin)