refactor: remove framesorter (#118)

This commit is contained in:
vladopajic
2025-09-18 18:41:15 +02:00
committed by GitHub
parent e5bcee4b90
commit 809bf493ff
10 changed files with 149 additions and 300 deletions

View File

@@ -1,126 +0,0 @@
import ../errors
import std/tables
import chronos
type FrameSorter* = ref object of RootRef
buffer*: Table[int64, byte] # sparse byte storage
emitPos*: int64 # where to emit data from
incoming*: AsyncQueue[seq[byte]]
totalBytes*: Opt[int64]
# contains total bytes for frame; and is known once a FIN is received
closed: bool
proc initFrameSorter*(incoming: AsyncQueue[seq[byte]]): FrameSorter =
return FrameSorter(
incoming: incoming,
buffer: initTable[int64, byte](),
emitPos: 0,
totalBytes: Opt.none(int64),
closed: false,
)
proc isEOF*(fs: FrameSorter): bool =
if fs.closed:
return true
if fs.totalBytes.isNone:
return false
return fs.emitPos >= fs.totalBytes.get()
proc sendEof(fs: var FrameSorter) {.raises: [QuicError].} =
if fs.isEOF():
# empty sequence is sent to unblock reading from incoming queue
try:
fs.incoming.putNoWait(@[])
except AsyncQueueFullError:
raise newException(QuicError, "Incoming queue is full")
proc putToQueue(fs: var FrameSorter, data: seq[byte]) {.raises: [QuicError].} =
if data.len > 0:
try:
fs.incoming.putNoWait(data)
except AsyncQueueFullError:
raise newException(QuicError, "Incoming queue is full")
fs.sendEof()
proc emitBufferedData(fs: var FrameSorter) {.raises: [QuicError].} =
var emitData: seq[byte]
while fs.buffer.hasKey(fs.emitPos):
try:
emitData.add fs.buffer[fs.emitPos]
except KeyError:
doAssert false, "already checked with hasKey"
fs.buffer.del(fs.emitPos)
inc fs.emitPos
fs.putToQueue(emitData)
proc close*(fs: var FrameSorter) =
if fs.closed:
return
fs.closed = true
fs.sendEof()
proc insert*(
fs: var FrameSorter, offset: uint64, data: seq[byte], isFin: bool
) {.raises: [QuicError].} =
if fs.closed:
return
if isFin:
fs.totalBytes = Opt.some(offset.int64 + max(data.len - 1, 0))
defer:
# send EOF in defer so that it happens after
# data is written to incoming queue (if any)
fs.sendEof()
if data.len == 0:
return
# if offset matches emit position, framesorter can emit entire input in batch
if offset.int == fs.emitPos:
fs.emitPos += data.len
fs.putToQueue(data)
# in addition check if there is buffered data to emit
fs.emitBufferedData()
return
# Insert bytes into sparse buffer
for i, b in data:
let pos = offset.int + i
if fs.totalBytes.isSome and pos > fs.totalBytes.unsafeGet:
continue
if fs.buffer.hasKey(pos):
try:
if fs.buffer[pos] != b:
raise newException(QuicError, "conflicting byte received. protocol violation")
# else: already same value, nothing to do
except KeyError:
doAssert false, "already checked with hasKey"
elif pos >= fs.emitPos: # put data to buffer, avoiding emitted data
fs.buffer[pos] = b
# Try to emit contiguous data
fs.emitBufferedData()
proc reset*(fs: var FrameSorter) =
fs.totalBytes = Opt.none(int64)
fs.buffer.clear()
fs.incoming.clear()
fs.emitPos = 0
# resetting FS should leave fs.closed (if it was set)
proc isComplete*(fs: FrameSorter): bool =
if fs.closed:
return true
if fs.totalBytes.isNone:
return false
let total = fs.totalBytes.unsafeGet
return fs.emitPos - 1 + len(fs.buffer) >= total

View File

@@ -1,13 +1,12 @@
import ../../../basics
import ../../framesorter
import ../../stream
import ../native/connection
import ./queue
type BaseStreamState* = ref object of StreamState
stream*: Opt[Stream]
incoming*: AsyncQueue[seq[byte]]
queue*: StreamQueue
connection*: Ngtcp2Connection
frameSorter*: FrameSorter
finSent*: bool
method expire*(state: BaseStreamState) {.raises: [].} =

View File

@@ -1,7 +1,7 @@
import ../../../errors
import ../../../basics
import ../../stream
import ../../framesorter
import ./queue
import ./basestate
type ClosedStreamState* = ref object of BaseStreamState
@@ -12,8 +12,7 @@ proc newClosedStreamState*(
): ClosedStreamState =
ClosedStreamState(
connection: base.connection,
incoming: base.incoming,
frameSorter: base.frameSorter,
queue: base.queue,
finSent: base.finSent,
wasReset: wasReset,
)
@@ -23,8 +22,8 @@ method enter*(state: ClosedStreamState, stream: Stream) =
state.stream = Opt.some(stream)
state.setUserData(stream)
if state.wasReset:
state.frameSorter.reset()
state.frameSorter.close()
state.queue.reset()
state.queue.close()
if state.wasReset:
state.reset(stream)
else:
@@ -40,7 +39,7 @@ method read*(state: ClosedStreamState): Future[seq[byte]] {.async.} =
raise newException(ClosedStreamError, "stream was reset")
try:
return state.incoming.popFirstNoWait()
return state.queue.incoming.popFirstNoWait()
except AsyncQueueEmptyError:
discard

View File

@@ -1,7 +1,7 @@
import ../../../basics
import ../../framesorter
import ../../stream
import ../native/connection
import ./queue
import ./basestate
import ./closestate
import ./receivestate
@@ -10,10 +10,7 @@ import ./sendstate
type OpenStreamState* = ref object of BaseStreamState
proc newOpenStreamState*(connection: Ngtcp2Connection): OpenStreamState =
let incomingQ = newAsyncQueue[seq[byte]]()
OpenStreamState(
connection: connection, incoming: incomingQ, frameSorter: initFrameSorter(incomingQ)
)
OpenStreamState(connection: connection, queue: initStreamQueue())
method enter*(state: OpenStreamState, stream: Stream) =
procCall enter(StreamState(state), stream)
@@ -26,10 +23,10 @@ method leave*(state: OpenStreamState) =
method read*(state: OpenStreamState): Future[seq[byte]] {.async.} =
# Check for immediate EOF conditions
if state.frameSorter.isEOF() and state.incoming.len == 0:
if state.queue.isEOF() and state.queue.incoming.len == 0:
return @[] # Return EOF immediately per RFC 9000 "Data Read" state
let data = await state.incoming.get()
let data = await state.queue.incoming.get()
# If we got data, return it with flow control update
if data.len > 0:
@@ -37,7 +34,7 @@ method read*(state: OpenStreamState): Future[seq[byte]] {.async.} =
return data
# Empty data (len == 0) and this is EOF
if state.frameSorter.isEOF():
if state.queue.isEOF():
return @[] # Return EOF per RFC 9000
# Empty data but no EOF; continue reading for more data
@@ -62,7 +59,7 @@ method isClosed*(state: OpenStreamState): bool =
false
method receive*(state: OpenStreamState, offset: uint64, bytes: seq[byte], isFin: bool) =
state.frameSorter.insert(offset, bytes, isFin)
state.queue.insert(offset, bytes, isFin)
method reset*(state: OpenStreamState) =
state.switch(newClosedStreamState(state, wasReset = true))

View File

@@ -0,0 +1,65 @@
import ../../../errors
import chronos
type StreamQueue* = ref object of RootRef
emitPos*: int64 # where to emit data from
incoming*: AsyncQueue[seq[byte]]
totalBytes*: Opt[int64]
# contains total bytes for frame; and is known once a FIN is received
closed: bool
proc initStreamQueue*(): StreamQueue =
return StreamQueue(
incoming: newAsyncQueue[seq[byte]](),
emitPos: 0,
totalBytes: Opt.none(int64),
closed: false,
)
proc isEOF*(sq: StreamQueue): bool =
if sq.closed:
return true
if sq.totalBytes.isNone:
return false
return sq.emitPos >= sq.totalBytes.get()
proc sendEof(sq: var StreamQueue) {.raises: [QuicError].} =
if sq.isEOF():
# empty sequence is sent to unblock reading from incoming queue
try:
sq.incoming.putNoWait(@[])
except AsyncQueueFullError:
raise newException(QuicError, "Incoming queue is full")
proc close*(sq: var StreamQueue) =
if sq.closed:
return
sq.closed = true
sq.sendEof()
proc insert*(
sq: var StreamQueue, ofsqet: uint64, data: seq[byte], isFin: bool
) {.raises: [QuicError].} =
if sq.closed:
return
if isFin:
sq.totalBytes = Opt.some(ofsqet.int64 + max(data.len - 1, 0))
try:
if data.len > 0:
sq.incoming.putNoWait(data)
sq.emitPos += data.len
except AsyncQueueFullError:
raise newException(QuicError, "Incoming queue is full")
sq.sendEof()
proc reset*(sq: var StreamQueue) =
sq.totalBytes = Opt.none(int64)
sq.incoming.clear()
sq.emitPos = 0
# resetting FS should leave sq.closed (if it was set)

View File

@@ -1,7 +1,7 @@
import ../../../errors
import ../../../basics
import ../../stream
import ../../framesorter
import ./queue
import ./basestate
import ./closestate
@@ -9,10 +9,7 @@ type ReceiveStreamState* = ref object of BaseStreamState
proc newReceiveStreamState*(base: BaseStreamState): ReceiveStreamState =
ReceiveStreamState(
connection: base.connection,
incoming: base.incoming,
frameSorter: base.frameSorter,
finSent: base.finSent,
connection: base.connection, queue: base.queue, finSent: base.finSent
)
method enter*(state: ReceiveStreamState, stream: Stream) =
@@ -27,11 +24,11 @@ method leave*(state: ReceiveStreamState) =
method read*(state: ReceiveStreamState): Future[seq[byte]] {.async.} =
# Check for immediate EOF conditions
if state.frameSorter.isEOF() and state.incoming.len == 0:
if state.queue.isEOF() and state.queue.incoming.len == 0:
state.switch(newClosedStreamState(state))
return @[] # Return EOF immediately per RFC 9000 "Data Read" state
let data = await state.incoming.get()
let data = await state.queue.incoming.get()
# If we got data, return it with flow control update
if data.len > 0:
@@ -39,7 +36,7 @@ method read*(state: ReceiveStreamState): Future[seq[byte]] {.async.} =
return data
# Empty data (len == 0) and this is EOF
if state.frameSorter.isEOF():
if state.queue.isEOF():
state.switch(newClosedStreamState(state))
return @[] # Return EOF per RFC 9000
@@ -67,8 +64,8 @@ method isClosed*(state: ReceiveStreamState): bool =
method receive*(
state: ReceiveStreamState, offset: uint64, bytes: seq[byte], isFin: bool
) =
state.frameSorter.insert(offset, bytes, isFin)
if state.frameSorter.isComplete():
state.queue.insert(offset, bytes, isFin)
if state.queue.isEOF():
state.switch(newClosedStreamState(state))
method reset*(state: ReceiveStreamState) =

View File

@@ -1,25 +1,20 @@
import ../../../errors
import ../../../basics
import ../../stream
import ../../framesorter
import ./queue
import ./basestate
import ./closestate
type SendStreamState* = ref object of BaseStreamState
proc newSendStreamState*(base: BaseStreamState): SendStreamState =
SendStreamState(
connection: base.connection,
incoming: base.incoming,
frameSorter: base.frameSorter,
finSent: base.finSent,
)
SendStreamState(connection: base.connection, queue: base.queue, finSent: base.finSent)
method enter*(state: SendStreamState, stream: Stream) =
procCall enter(StreamState(state), stream)
state.stream = Opt.some(stream)
state.setUserData(stream)
state.frameSorter.close()
state.queue.close()
method leave*(state: SendStreamState) =
procCall leave(StreamState(state))

View File

@@ -1,137 +0,0 @@
import unittest
import quic/transport/framesorter
import quic/errors
import std/[options, tables]
import chronos
proc allData(q: AsyncQueue[seq[byte]]): seq[byte] =
var data: seq[byte]
while q.len > 0:
data.add(waitFor(q.get()))
return data
suite "FrameSorter tests":
test "insert single chunk no FIN":
var q = newAsyncQueue[seq[byte]]()
var fs = initFrameSorter(q)
fs.insert(0, @[1'u8, 2, 3], false)
check fs.emitPos == 3
check fs.buffer.len == 0
let emitted = allData(q)
check emitted == @[1'u8, 2, 3]
check not fs.isEOF()
test "insert chunks before chunk at offset 0 has been received":
var q = newAsyncQueue[seq[byte]]()
var fs = initFrameSorter(q)
fs.insert(1, @[2'u8], false)
fs.insert(3, @[4'u8], false)
check fs.emitPos == 0
check fs.buffer.len == 2
check q.len == 0
check not fs.isEOF()
test "insert chunk with FIN":
var q = newAsyncQueue[seq[byte]]()
var fs = initFrameSorter(q)
fs.insert(0, @[1'u8, 2, 3], true)
check fs.totalBytes.get() == 2
check fs.isEOF()
test "chunks inserted out of order are emitted in correct order":
var q = newAsyncQueue[seq[byte]]()
var fs = initFrameSorter(q)
fs.insert(1, @[2'u8, 3, 4], false)
fs.insert(4, @[5'u8, 6], true)
fs.insert(0, @[1'u8], false)
check fs.emitPos == 6
check fs.buffer.len == 0
let emitted = allData(q)
check emitted == @[1'u8, 2, 3, 4, 5, 6]
check fs.isEOF()
test "chunks are read correctly":
var q = newAsyncQueue[seq[byte]]()
var fs = initFrameSorter(q)
fs.insert(0, @[1'u8, 2, 3], false)
check fs.emitPos == 3
check fs.buffer.len == 0
var emitted = allData(q)
check emitted == @[1'u8, 2, 3]
fs.insert(9, @[10'u8, 11, 12], false)
fs.insert(3, @[4'u8, 5, 6], false)
check fs.emitPos == 6
check fs.buffer.len == 3 # [10, 11, 12] are not emitted yet
emitted = allData(q)
check emitted == @[4'u8, 5, 6]
test "chunks received after fin are ignored":
var q = newAsyncQueue[seq[byte]]()
var fs = initFrameSorter(q)
fs.insert(1, @[2'u8, 3, 4], true)
fs.insert(4, @[5'u8, 6, 7], false)
fs.insert(2, @[3'u8, 4, 5], false)
fs.insert(0, @[1'u8], false)
check fs.emitPos == 4
check fs.buffer.len == 0
var emitted = allData(q)
check emitted == @[1'u8, 2, 3, 4]
test "insert overlapping identical chunk":
var q = newAsyncQueue[seq[byte]]()
var fs = initFrameSorter(q)
fs.insert(0, @[1'u8, 2, 3], false)
fs.insert(1, @[2'u8, 3], false) # identical bytes, should not raise
check fs.emitPos == 3
var emitted = allData(q)
check emitted == @[1'u8, 2, 3]
test "insert overlapping conflicting chunk":
var q = newAsyncQueue[seq[byte]]()
var fs = initFrameSorter(q)
fs.insert(1, @[2'u8, 3, 4], false)
expect QuicError:
fs.insert(2, @[9'u8, 3], false)
test "detect complete stream":
var q = newAsyncQueue[seq[byte]]()
var fs = initFrameSorter(q)
fs.insert(0, @[1'u8, 2, 3], false)
fs.insert(3, @[4'u8, 5], true)
check fs.isComplete()
test "detect incomplete stream with gap":
var q = newAsyncQueue[seq[byte]]()
var fs = initFrameSorter(q)
fs.insert(0, @[1'u8, 2, 3], false)
fs.insert(4, @[5'u8], true)
check not fs.isComplete()
test "reset":
var q = newAsyncQueue[seq[byte]]()
var fs = initFrameSorter(q)
fs.insert(0, @[1'u8, 2, 3], true)
check fs.totalBytes.isSome
fs.reset()
check fs.totalBytes.isNone
check fs.emitPos == 0
check fs.buffer.len == 0

View File

@@ -0,0 +1,60 @@
import std/[options]
import unittest
import chronos
import quic/transport/ngtcp2/streamstate/queue
proc allData(q: AsyncQueue[seq[byte]]): seq[byte] =
var data: seq[byte]
while q.len > 0:
data.add(waitFor(q.get()))
return data
suite "StreamQueue tests":
test "insert single chunk no FIN":
var fs = initStreamQueue()
fs.insert(0, @[1'u8, 2, 3], false)
check fs.emitPos == 3
check allData(fs.incoming) == @[1'u8, 2, 3]
check not fs.isEOF()
test "insert chunk with FIN":
var fs = initStreamQueue()
fs.insert(0, @[1'u8, 2, 3], true)
check fs.totalBytes.get() == 2
check fs.isEOF()
test "chunks inserted out of order are emitted in correct order":
var fs = initStreamQueue()
fs.insert(0, @[1'u8], false)
fs.insert(1, @[2'u8, 3, 4], false)
fs.insert(4, @[5'u8, 6], true)
check fs.emitPos == 6
check allData(fs.incoming) == @[1'u8, 2, 3, 4, 5, 6]
check fs.isEOF()
test "chunks are read correctly":
var fs = initStreamQueue()
fs.insert(0, @[1'u8, 2, 3], false)
check fs.emitPos == 3
check allData(fs.incoming) == @[1'u8, 2, 3]
fs.insert(3, @[4'u8, 5, 6], false)
fs.insert(9, @[10'u8, 11, 12], false)
check fs.emitPos == 9
check allData(fs.incoming) == @[4'u8, 5, 6, 10, 11, 12]
test "reset":
var fs = initStreamQueue()
fs.insert(0, @[1'u8, 2, 3], true)
check fs.totalBytes.isSome
fs.reset()
check fs.totalBytes.isNone
check fs.emitPos == 0

View File

@@ -15,7 +15,7 @@ import ./quic/testQuicConnection
import ./quic/testListener
import ./quic/testApi
import ./quic/testExample
import ./quic/testFramesorter
import ./quic/testStreamQueue
import ./quic/testPerf
{.warning[UnusedImport]: off.}