mirror of
https://github.com/vacp2p/nim-quic.git
synced 2026-01-09 22:08:09 -05:00
fix: race condition when reading and receiving eof (#91)
This commit is contained in:
@@ -8,6 +8,7 @@ type FrameSorter* = object
|
||||
incoming*: AsyncQueue[seq[byte]]
|
||||
totalBytes*: Opt[int64]
|
||||
# contains total bytes for frame; and is known once a FIN is received
|
||||
sentEof: bool
|
||||
|
||||
proc initFrameSorter*(incoming: AsyncQueue[seq[byte]]): FrameSorter =
|
||||
result.incoming = incoming
|
||||
@@ -15,13 +16,30 @@ proc initFrameSorter*(incoming: AsyncQueue[seq[byte]]): FrameSorter =
|
||||
result.emitPos = 0
|
||||
result.totalBytes = Opt.none(int64)
|
||||
|
||||
proc putToQueue(fs: FrameSorter, data: seq[byte]) {.raises: [QuicError].} =
|
||||
proc isEOF*(fs: FrameSorter): bool =
|
||||
if fs.totalBytes.isNone:
|
||||
return false
|
||||
|
||||
return fs.emitPos >= fs.totalBytes.get()
|
||||
|
||||
proc sendEof(fs: var FrameSorter) {.raises: [QuicError].} =
|
||||
if fs.isEOF() and not fs.sentEof:
|
||||
# empty sequence is sent to unblock reading from incoming queue
|
||||
try:
|
||||
fs.incoming.putNoWait(@[])
|
||||
fs.sentEof = true
|
||||
except AsyncQueueFullError:
|
||||
raise newException(QuicError, "Incoming queue is full")
|
||||
|
||||
proc putToQueue(fs: var FrameSorter, data: seq[byte]) {.raises: [QuicError].} =
|
||||
if data.len > 0:
|
||||
try:
|
||||
fs.incoming.putNoWait(data)
|
||||
except AsyncQueueFullError:
|
||||
raise newException(QuicError, "Incoming queue is full")
|
||||
|
||||
fs.sendEof()
|
||||
|
||||
proc emitBufferedData(fs: var FrameSorter) {.raises: [QuicError].} =
|
||||
var emitData: seq[byte]
|
||||
while fs.buffer.hasKey(fs.emitPos):
|
||||
@@ -39,6 +57,10 @@ proc insert*(
|
||||
) {.raises: [QuicError].} =
|
||||
if isFin and fs.totalBytes.isNone:
|
||||
fs.totalBytes = Opt.some(offset.int64 + max(data.len - 1, 0))
|
||||
defer:
|
||||
# send EOF in defer so that it happens after
|
||||
# data is written to incoming queue (if any)
|
||||
fs.sendEof()
|
||||
|
||||
# if offset matches emit position, framesorter can emit entire input in batch
|
||||
if offset.int == fs.emitPos and data.len > 0:
|
||||
@@ -70,17 +92,12 @@ proc insert*(
|
||||
# Try to emit contiguous data
|
||||
fs.emitBufferedData()
|
||||
|
||||
proc isEOF*(fs: FrameSorter): bool =
|
||||
if fs.totalBytes.isNone:
|
||||
return false
|
||||
|
||||
return fs.emitPos >= fs.totalBytes.get()
|
||||
|
||||
proc reset*(fs: var FrameSorter) =
|
||||
fs.totalBytes = Opt.none(int64)
|
||||
fs.buffer.clear()
|
||||
fs.incoming.clear()
|
||||
fs.emitPos = 0
|
||||
fs.sentEof = false
|
||||
|
||||
proc isComplete*(fs: FrameSorter): bool =
|
||||
if fs.totalBytes.isNone:
|
||||
|
||||
82
tests/quic/testPerf.nim
Normal file
82
tests/quic/testPerf.nim
Normal file
@@ -0,0 +1,82 @@
|
||||
import std/sequtils
|
||||
import pkg/chronos
|
||||
import pkg/chronos/unittest2/asynctests
|
||||
import pkg/quic/errors
|
||||
import pkg/quic/transport/stream
|
||||
import pkg/quic/transport/quicconnection
|
||||
import pkg/quic/transport/ngtcp2/native
|
||||
import pkg/quic/udp/datagram
|
||||
import pkg/stew/endians2
|
||||
import ../helpers/simulation
|
||||
|
||||
suite "perf protocol simulation":
|
||||
setup:
|
||||
var (client, server) = waitFor performHandshake()
|
||||
|
||||
teardown:
|
||||
waitFor client.drop()
|
||||
waitFor server.drop()
|
||||
|
||||
asyncTest "test":
|
||||
# This test simulates the exact perf protocol flow:
|
||||
# 1. Client sends 8 bytes (download size)
|
||||
# 2. Client sends upload data (100KB)
|
||||
# 3. Client calls closeWrite()
|
||||
# 4. Server reads all data including the closeWrite signal (should get EOF)
|
||||
# 5. Server sends download data back
|
||||
|
||||
let simulation = simulateNetwork(client, server)
|
||||
let clientStream = await client.openStream()
|
||||
|
||||
const
|
||||
uploadSize = 100000 # 100KB like in perf test
|
||||
downloadSize = 10000000 # 10MB like in perf test
|
||||
chunkSize = 65536 # 64KB chunks like perf
|
||||
|
||||
proc serverHandler() {.async.} =
|
||||
let serverStream = await server.incomingStream()
|
||||
|
||||
# Step 1: Read download size (8 bytes)
|
||||
let clientDownloadSize = await serverStream.read()
|
||||
|
||||
# Step 2: Read upload data until EOF
|
||||
var totalBytesRead = 0
|
||||
while true:
|
||||
let chunk = await serverStream.read()
|
||||
if chunk.len == 0:
|
||||
break
|
||||
totalBytesRead += chunk.len
|
||||
|
||||
# Step 3: Send download data back
|
||||
var remainingToSend = uint64.fromBytesBE(clientDownloadSize)
|
||||
while remainingToSend > 0:
|
||||
let toSend = min(remainingToSend, chunkSize)
|
||||
await serverStream.write(newSeq[byte](toSend))
|
||||
remainingToSend -= toSend
|
||||
|
||||
await serverStream.close()
|
||||
|
||||
# Start server handler
|
||||
asyncSpawn serverHandler()
|
||||
|
||||
# Step 1: Send download size, activate stream first
|
||||
await clientStream.write(toSeq(downloadSize.uint64.toBytesBE()))
|
||||
|
||||
# Step 2: Send upload data in chunks
|
||||
var remainingToSend = uploadSize
|
||||
while remainingToSend > 0:
|
||||
let toSend = min(remainingToSend, chunkSize)
|
||||
await clientStream.write(newSeq[byte](toSend))
|
||||
remainingToSend -= toSend
|
||||
|
||||
# Step 3: Close write side
|
||||
await clientStream.closeWrite()
|
||||
|
||||
# Step 4: Start reading download data
|
||||
var totalDownloaded = 0
|
||||
while totalDownloaded < downloadSize:
|
||||
let chunk = await clientStream.read()
|
||||
totalDownloaded += chunk.len
|
||||
|
||||
await clientStream.close()
|
||||
await simulation.cancelAndWait()
|
||||
@@ -16,5 +16,6 @@ import ./quic/testListener
|
||||
import ./quic/testApi
|
||||
import ./quic/testExample
|
||||
import ./quic/testFramesorter
|
||||
import ./quic/testPerf
|
||||
|
||||
{.warning[UnusedImport]: off.}
|
||||
|
||||
Reference in New Issue
Block a user