chore(stream): improvements (#101)

This commit is contained in:
vladopajic
2025-08-21 20:06:17 +02:00
committed by GitHub
parent 918ea9d068
commit aa46da8541
6 changed files with 13 additions and 22 deletions

View File

@@ -53,15 +53,18 @@ proc emitBufferedData(fs: var FrameSorter) {.raises: [QuicError].} =
proc insert*(
fs: var FrameSorter, offset: uint64, data: seq[byte], isFin: bool
) {.raises: [QuicError].} =
if isFin and fs.totalBytes.isNone:
if isFin:
fs.totalBytes = Opt.some(offset.int64 + max(data.len - 1, 0))
defer:
# send EOF in defer so that it happens after
# data is written to incoming queue (if any)
fs.sendEof()
if data.len == 0:
return
# if offset matches emit position, framesorter can emit entire input in batch
if offset.int == fs.emitPos and data.len > 0:
if offset.int == fs.emitPos:
fs.emitPos += data.len
fs.putToQueue(data)
@@ -76,7 +79,6 @@ proc insert*(
if fs.totalBytes.isSome and pos > fs.totalBytes.unsafeGet:
continue
if fs.buffer.hasKey(pos):
try:
if fs.buffer[pos] != b:

View File

@@ -3,6 +3,7 @@ import ../../../basics
import ../../stream
import ../../framesorter
import ./basestream
import ./helpers
type ClosedStream* = ref object of BaseStream
wasReset: bool
@@ -13,7 +14,7 @@ proc newClosedStream*(
ClosedStream(incoming: incoming, wasReset: wasReset)
method enter*(state: ClosedStream, stream: Stream) =
discard
setUserData(state.stream, state.connection, nil)
method leave*(state: ClosedStream) =
discard

View File

@@ -22,7 +22,6 @@ method enter*(state: OpenStream, stream: Stream) =
setUserData(state.stream, state.connection, unsafeAddr state[])
method leave*(state: OpenStream) =
setUserData(state.stream, state.connection, nil)
procCall leave(StreamState(state))
state.stream = Opt.none(Stream)
@@ -85,12 +84,11 @@ method isClosed*(state: OpenStream): bool =
false
method receive*(state: OpenStream, offset: uint64, bytes: seq[byte], isFin: bool) =
let stream = state.stream.valueOr:
return
state.frameSorter.insert(offset, bytes, isFin)
if state.frameSorter.isComplete():
let stream = state.stream.valueOr:
return
stream.closed.fire()
stream.switch(newClosedStream(state.incoming, state.frameSorter))

View File

@@ -22,7 +22,6 @@ method enter*(state: ReceiveStream, stream: Stream) =
setUserData(state.stream, state.connection, unsafeAddr state[])
method leave*(state: ReceiveStream) =
setUserData(state.stream, state.connection, nil)
procCall leave(StreamState(state))
state.stream = Opt.none(Stream)
@@ -88,12 +87,11 @@ method isClosed*(state: ReceiveStream): bool =
false
method receive*(state: ReceiveStream, offset: uint64, bytes: seq[byte], isFin: bool) =
let stream = state.stream.valueOr:
return
state.frameSorter.insert(offset, bytes, isFin)
if state.frameSorter.isComplete():
let stream = state.stream.valueOr:
return
stream.closed.fire()
stream.switch(newClosedStream(state.incoming, state.frameSorter))

View File

@@ -22,7 +22,6 @@ method enter*(state: SendStream, stream: Stream) =
setUserData(state.stream, state.connection, unsafeAddr state[])
method leave*(state: SendStream) =
setUserData(state.stream, state.connection, nil)
procCall leave(StreamState(state))
state.stream = Opt.none(Stream)

View File

@@ -704,17 +704,10 @@ suite "streams":
await clientStream.write(testData)
await clientStream.closeWrite()
let clientTask = clientWriteTask()
# Small delay to let client start writing first
await sleepAsync(5.milliseconds)
discard clientWriteTask()
# Server starts reading in parallel (after client already started)
let serverTask = readStreamTillEOF(serverStream)
# Wait for both operations to complete
await clientTask
let receivedData = await serverTask
let receivedData = await readStreamTillEOF(serverStream)
# Verify data
check receivedData == testData