mirror of
https://github.com/vacp2p/nim-quic.git
synced 2026-01-08 21:38:05 -05:00
refactor: stream state (#96)
This commit is contained in:
@@ -22,6 +22,7 @@ export incomingStream
|
||||
export read
|
||||
export write
|
||||
export closeWrite
|
||||
export closeRead
|
||||
export stop
|
||||
export drop
|
||||
export close
|
||||
@@ -34,6 +35,7 @@ export CustomCertificateVerifier
|
||||
export InsecureCertificateVerifier
|
||||
export init
|
||||
export TimeOutError
|
||||
export ClosedStreamError
|
||||
export certificates
|
||||
|
||||
type TLSConfig* = object
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
type
|
||||
QuicError* = object of IOError
|
||||
QuicDefect* = object of Defect
|
||||
QuicConfigError* = object of CatchableError
|
||||
QuicError* = object of IOError
|
||||
ClosedStreamError* = object of QuicError
|
||||
|
||||
template errorAsDefect*(body): untyped =
|
||||
try:
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import ngtcp2
|
||||
import ../../../helpers/openarray
|
||||
import ../../stream
|
||||
import ../stream/openstate
|
||||
import ../stream/openstream
|
||||
import ./connection
|
||||
import chronicles
|
||||
|
||||
|
||||
10
quic/transport/ngtcp2/stream/basestream.nim
Normal file
10
quic/transport/ngtcp2/stream/basestream.nim
Normal file
@@ -0,0 +1,10 @@
|
||||
import ../../../basics
|
||||
import ../../framesorter
|
||||
import ../../stream
|
||||
import ../native/connection
|
||||
|
||||
type BaseStream* = ref object of StreamState
|
||||
stream*: Opt[Stream]
|
||||
incoming*: AsyncQueue[seq[byte]]
|
||||
connection*: Ngtcp2Connection
|
||||
frameSorter*: FrameSorter
|
||||
@@ -1,23 +1,16 @@
|
||||
import ../../../errors
|
||||
import ../../../basics
|
||||
import ../../stream
|
||||
import ../../framesorter
|
||||
import chronicles
|
||||
import ./basestream
|
||||
|
||||
logScope:
|
||||
topics = "closed state"
|
||||
|
||||
type
|
||||
ClosedStream* = ref object of StreamState
|
||||
remaining: AsyncQueue[seq[byte]]
|
||||
frameSorter: FrameSorter
|
||||
wasReset: bool
|
||||
|
||||
ClosedStreamError* = object of StreamError
|
||||
type ClosedStream* = ref object of BaseStream
|
||||
wasReset: bool
|
||||
|
||||
proc newClosedStream*(
|
||||
remaining: AsyncQueue[seq[byte]], frameSorter: FrameSorter, wasReset: bool = false
|
||||
incoming: AsyncQueue[seq[byte]], frameSorter: FrameSorter, wasReset: bool = false
|
||||
): ClosedStream =
|
||||
ClosedStream(remaining: remaining, wasReset: wasReset)
|
||||
ClosedStream(incoming: incoming, wasReset: wasReset)
|
||||
|
||||
method enter*(state: ClosedStream, stream: Stream) =
|
||||
discard
|
||||
@@ -31,15 +24,14 @@ method read*(state: ClosedStream): Future[seq[byte]] {.async.} =
|
||||
raise newException(ClosedStreamError, "stream was reset")
|
||||
|
||||
try:
|
||||
return state.remaining.popFirstNoWait()
|
||||
return state.incoming.popFirstNoWait()
|
||||
except AsyncQueueEmptyError:
|
||||
discard
|
||||
|
||||
# When no more data is available, return EOF instead of throwing exception
|
||||
return @[] # Return EOF for closed streams
|
||||
return @[]
|
||||
|
||||
method write*(state: ClosedStream, bytes: seq[byte]) {.async.} =
|
||||
trace "cant write, stream is closed"
|
||||
raise newException(ClosedStreamError, "stream is closed")
|
||||
|
||||
method close*(state: ClosedStream) {.async.} =
|
||||
@@ -48,6 +40,9 @@ method close*(state: ClosedStream) {.async.} =
|
||||
method closeWrite*(state: ClosedStream) {.async.} =
|
||||
discard
|
||||
|
||||
method closeRead*(state: ClosedStream) {.async.} =
|
||||
discard
|
||||
|
||||
method onClose*(state: ClosedStream) =
|
||||
discard
|
||||
|
||||
@@ -6,12 +6,10 @@ import ../native/connection
|
||||
proc allowMoreIncomingBytes*(
|
||||
stream: Opt[stream.Stream], connection: Ngtcp2Connection, amount: uint64
|
||||
) =
|
||||
if stream.isSome:
|
||||
let stream = stream.get()
|
||||
connection.extendStreamOffset(stream.id, amount)
|
||||
connection.send()
|
||||
else:
|
||||
trace "no stream available"
|
||||
let stream = stream.valueOr:
|
||||
return
|
||||
connection.extendStreamOffset(stream.id, amount)
|
||||
connection.send()
|
||||
|
||||
proc setUserData*(
|
||||
stream: Opt[stream.Stream], connection: Ngtcp2Connection, userdata: pointer
|
||||
@@ -19,3 +17,8 @@ proc setUserData*(
|
||||
let stream = stream.valueOr:
|
||||
return
|
||||
connection.setStreamUserData(stream.id, userdata)
|
||||
|
||||
proc expire*(stream: Opt[stream.Stream]) =
|
||||
let stream = stream.valueOr:
|
||||
return
|
||||
stream.closed.fire()
|
||||
|
||||
@@ -1,171 +0,0 @@
|
||||
import ../../../basics
|
||||
import ../../framesorter
|
||||
import ../../stream
|
||||
import ./helpers
|
||||
import ../native/connection
|
||||
import ./closedstate
|
||||
import chronicles
|
||||
|
||||
logScope:
|
||||
topics = "open state"
|
||||
|
||||
type OpenStream* = ref object of StreamState
|
||||
stream*: Opt[Stream]
|
||||
incoming*: AsyncQueue[seq[byte]]
|
||||
connection*: Ngtcp2Connection
|
||||
frameSorter*: FrameSorter
|
||||
closeFut*: Future[string]
|
||||
writeFinSent*: bool
|
||||
readClosed*: bool
|
||||
|
||||
proc newOpenStream*(connection: Ngtcp2Connection): OpenStream =
|
||||
let incomingQ = newAsyncQueue[seq[byte]]()
|
||||
OpenStream(
|
||||
connection: connection,
|
||||
incoming: incomingQ,
|
||||
closeFut: newFuture[string](),
|
||||
frameSorter: initFrameSorter(incomingQ),
|
||||
writeFinSent: false,
|
||||
readClosed: false,
|
||||
)
|
||||
|
||||
method enter*(state: OpenStream, stream: Stream) =
|
||||
procCall enter(StreamState(state), stream)
|
||||
state.stream = Opt.some(stream)
|
||||
setUserData(state.stream, state.connection, unsafeAddr state[])
|
||||
|
||||
method leave*(state: OpenStream) =
|
||||
setUserData(state.stream, state.connection, nil)
|
||||
procCall leave(StreamState(state))
|
||||
state.stream = Opt.none(Stream)
|
||||
|
||||
method read*(state: OpenStream): Future[seq[byte]] {.async.} =
|
||||
# RFC 9000 compliant stream reading logic
|
||||
# Priority 1: Check for immediate EOF conditions
|
||||
if state.frameSorter.isEOF() and state.incoming.len == 0:
|
||||
# Remote sent FIN and no more data - check if we should switch to ClosedStream
|
||||
if state.readClosed:
|
||||
# Both remote FIN received and local read closed - switch to ClosedStream
|
||||
let stream = state.stream.valueOr:
|
||||
return @[] # Already closed
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
return @[] # Return EOF immediately per RFC 9000 "Data Read" state
|
||||
|
||||
# Priority 2: Check if local read is closed but there's still buffered data
|
||||
if state.readClosed and state.incoming.len == 0:
|
||||
# Local read closed and no buffered data - switch to ClosedStream
|
||||
let stream = state.stream.valueOr:
|
||||
return @[] # Already closed
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
return @[] # Return EOF for locally closed read
|
||||
|
||||
# Priority 3: Get data from incoming queue
|
||||
let data = await state.incoming.get()
|
||||
|
||||
# If we got real data, return it with flow control update
|
||||
if data.len > 0:
|
||||
allowMoreIncomingBytes(state.stream, state.connection, data.len.uint64)
|
||||
return data
|
||||
|
||||
# If we got empty data (len == 0), check if this is EOF
|
||||
if data.len == 0 and state.frameSorter.isEOF():
|
||||
# This is EOF - stream has been closed with FIN bit from remote
|
||||
let stream = state.stream.valueOr:
|
||||
return @[] # Already closed
|
||||
# If local read is also closed, switch to ClosedStream
|
||||
if state.readClosed:
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
return @[] # Return EOF per RFC 9000
|
||||
|
||||
# If local read is closed but we got empty data (not EOF), return EOF
|
||||
if state.readClosed:
|
||||
let stream = state.stream.valueOr:
|
||||
return @[] # Already closed
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
return @[] # Return EOF for locally closed read
|
||||
|
||||
# Empty data but no EOF - this shouldn't happen in normal operation
|
||||
# Continue reading for more data
|
||||
return await state.read()
|
||||
|
||||
method write*(state: OpenStream, bytes: seq[byte]): Future[void] =
|
||||
if state.writeFinSent:
|
||||
let fut = newFuture[void]()
|
||||
fut.fail(newException(StreamError, "write side is closed"))
|
||||
return fut
|
||||
# let stream = state.stream.valueOr:
|
||||
# raise newException(QuicError, "stream is closed")
|
||||
# See https://github.com/status-im/nim-quic/pull/41 for more details
|
||||
state.connection.send(state.stream.get.id, bytes)
|
||||
|
||||
method close*(state: OpenStream) {.async.} =
|
||||
## Close both write and read sides of the stream
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
# Close write side by sending FIN
|
||||
discard state.connection.send(state.stream.get.id, @[], true) # Send FIN
|
||||
state.writeFinSent = true
|
||||
|
||||
# Close read side locally
|
||||
state.readClosed = true
|
||||
|
||||
# Don't switch to ClosedStream immediately - let read() handle the transition
|
||||
# when all buffered data is consumed
|
||||
|
||||
method closeWrite*(state: OpenStream) {.async.} =
|
||||
## Close write side by sending FIN, but keep read side open
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
discard state.connection.send(state.stream.get.id, @[], true) # Send FIN
|
||||
state.writeFinSent = true
|
||||
# Note: we don't switch to ClosedStream here - read side stays open for half-close
|
||||
|
||||
method reset*(state: OpenStream) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
state.closeFut.complete("stream reset")
|
||||
state.connection.shutdownStream(stream.id)
|
||||
stream.closed.fire()
|
||||
state.frameSorter.reset()
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter, wasReset = true))
|
||||
|
||||
method onClose*(state: OpenStream) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
# Wake up pending read() operations before switching states
|
||||
# This fixes race condition when ngtcp2 calls onClose() while read() is waiting
|
||||
try:
|
||||
state.incoming.putNoWait(@[]) # Send EOF marker to wake up pending reads
|
||||
except AsyncQueueFullError:
|
||||
# Queue is full, that's fine - there's already data to process
|
||||
discard
|
||||
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method isClosed*(state: OpenStream): bool =
|
||||
false
|
||||
|
||||
method receive*(state: OpenStream, offset: uint64, bytes: seq[byte], isFin: bool) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
state.frameSorter.insert(offset, bytes, isFin)
|
||||
|
||||
if state.frameSorter.isComplete():
|
||||
stream.closed.fire()
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
elif isFin and bytes.len == 0 and state.frameSorter.isEOF():
|
||||
# Special handling: FIN with no data and we've reached EOF
|
||||
# Peer has finished sending data, but we don't switch to ClosedStream automatically
|
||||
# because we might still need to write back (half-close scenario)
|
||||
# Don't switch to ClosedStream - stay in OpenStream so we can still write
|
||||
discard
|
||||
|
||||
method expire*(state: OpenStream) {.raises: [].} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
state.closeFut.complete("connection timed out")
|
||||
stream.closed.fire()
|
||||
107
quic/transport/ngtcp2/stream/openstream.nim
Normal file
107
quic/transport/ngtcp2/stream/openstream.nim
Normal file
@@ -0,0 +1,107 @@
|
||||
import ../../../basics
|
||||
import ../../framesorter
|
||||
import ../../stream
|
||||
import ../native/connection
|
||||
import ./basestream
|
||||
import ./closestream
|
||||
import ./receivestream
|
||||
import ./sendstream
|
||||
import ./helpers
|
||||
|
||||
type OpenStream* = ref object of BaseStream
|
||||
|
||||
proc newOpenStream*(connection: Ngtcp2Connection): OpenStream =
|
||||
let incomingQ = newAsyncQueue[seq[byte]]()
|
||||
OpenStream(
|
||||
connection: connection, incoming: incomingQ, frameSorter: initFrameSorter(incomingQ)
|
||||
)
|
||||
|
||||
method enter*(state: OpenStream, stream: Stream) =
|
||||
procCall enter(StreamState(state), stream)
|
||||
state.stream = Opt.some(stream)
|
||||
setUserData(state.stream, state.connection, unsafeAddr state[])
|
||||
|
||||
method leave*(state: OpenStream) =
|
||||
setUserData(state.stream, state.connection, nil)
|
||||
procCall leave(StreamState(state))
|
||||
state.stream = Opt.none(Stream)
|
||||
|
||||
method read*(state: OpenStream): Future[seq[byte]] {.async.} =
|
||||
# Check for immediate EOF conditions
|
||||
if state.frameSorter.isEOF() and state.incoming.len == 0:
|
||||
return @[] # Return EOF immediately per RFC 9000 "Data Read" state
|
||||
|
||||
# Get data from incoming queue
|
||||
let data = await state.incoming.get()
|
||||
|
||||
# If we got real data, return it with flow control update
|
||||
if data.len > 0:
|
||||
allowMoreIncomingBytes(state.stream, state.connection, data.len.uint64)
|
||||
return data
|
||||
|
||||
# If we got empty data (len == 0), check if this is EOF
|
||||
if data.len == 0 and state.frameSorter.isEOF():
|
||||
return @[] # Return EOF per RFC 9000
|
||||
|
||||
# Empty data but no EOF; continue reading for more data
|
||||
return await state.read()
|
||||
|
||||
method write*(state: OpenStream, bytes: seq[byte]): Future[void] =
|
||||
state.connection.send(state.stream.get.id, bytes)
|
||||
|
||||
method close*(state: OpenStream) {.async.} =
|
||||
# Bidirectional streams, close() only closes the send side of the stream.
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
discard state.connection.send(state.stream.get.id, @[], true) # Send FIN
|
||||
stream.switch(newReceiveStream(state.connection, state.incoming, state.frameSorter))
|
||||
|
||||
method closeWrite*(state: OpenStream) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
discard state.connection.send(state.stream.get.id, @[], true) # Send FIN
|
||||
stream.switch(newReceiveStream(state.connection, state.incoming, state.frameSorter))
|
||||
|
||||
method closeRead*(state: OpenStream) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.switch(newSendStream(state.connection, state.incoming, state.frameSorter))
|
||||
|
||||
method onClose*(state: OpenStream) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
# Wake up pending read() operations before switching states
|
||||
# This fixes race condition when ngtcp2 calls onClose() while read() is waiting
|
||||
try:
|
||||
state.incoming.putNoWait(@[]) # Send EOF marker to wake up pending reads
|
||||
except AsyncQueueFullError:
|
||||
# Queue is full, that's fine - there's already data to process
|
||||
discard
|
||||
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method isClosed*(state: OpenStream): bool =
|
||||
false
|
||||
|
||||
method receive*(state: OpenStream, offset: uint64, bytes: seq[byte], isFin: bool) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
state.frameSorter.insert(offset, bytes, isFin)
|
||||
|
||||
if state.frameSorter.isComplete():
|
||||
stream.closed.fire()
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method reset*(state: OpenStream) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
state.connection.shutdownStream(stream.id)
|
||||
stream.closed.fire()
|
||||
state.frameSorter.reset()
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter, wasReset = true))
|
||||
|
||||
method expire*(state: OpenStream) {.raises: [].} =
|
||||
expire(state.stream)
|
||||
110
quic/transport/ngtcp2/stream/receivestream.nim
Normal file
110
quic/transport/ngtcp2/stream/receivestream.nim
Normal file
@@ -0,0 +1,110 @@
|
||||
import ../../../errors
|
||||
import ../../../basics
|
||||
import ../../stream
|
||||
import ../../framesorter
|
||||
import ../native/connection
|
||||
import ./basestream
|
||||
import ./closestream
|
||||
import ./helpers
|
||||
|
||||
type ReceiveStream* = ref object of BaseStream
|
||||
|
||||
proc newReceiveStream*(
|
||||
connection: Ngtcp2Connection,
|
||||
incoming: AsyncQueue[seq[byte]],
|
||||
frameSorter: FrameSorter,
|
||||
): ReceiveStream =
|
||||
ReceiveStream(connection: connection, incoming: incoming, frameSorter: frameSorter)
|
||||
|
||||
method enter*(state: ReceiveStream, stream: Stream) =
|
||||
procCall enter(StreamState(state), stream)
|
||||
state.stream = Opt.some(stream)
|
||||
setUserData(state.stream, state.connection, unsafeAddr state[])
|
||||
|
||||
method leave*(state: ReceiveStream) =
|
||||
setUserData(state.stream, state.connection, nil)
|
||||
procCall leave(StreamState(state))
|
||||
state.stream = Opt.none(Stream)
|
||||
|
||||
method read*(state: ReceiveStream): Future[seq[byte]] {.async.} =
|
||||
# Check for immediate EOF conditions
|
||||
if state.frameSorter.isEOF() and state.incoming.len == 0:
|
||||
let stream = state.stream.valueOr:
|
||||
return @[] # Already closed
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
return @[] # Return EOF immediately per RFC 9000 "Data Read" state
|
||||
|
||||
# Get data from incoming queue
|
||||
let data = await state.incoming.get()
|
||||
|
||||
# If we got real data, return it with flow control update
|
||||
if data.len > 0:
|
||||
allowMoreIncomingBytes(state.stream, state.connection, data.len.uint64)
|
||||
return data
|
||||
|
||||
# If we got empty data (len == 0), check if this is EOF
|
||||
if data.len == 0 and state.frameSorter.isEOF():
|
||||
# This is EOF - stream has been closed with FIN bit from remote
|
||||
let stream = state.stream.valueOr:
|
||||
return @[] # Already closed
|
||||
# If local read is also closed, switch to ClosedStream
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
return @[] # Return EOF per RFC 9000
|
||||
|
||||
# Empty data but no EOF; continue reading for more data
|
||||
return await state.read()
|
||||
|
||||
method write*(state: ReceiveStream, bytes: seq[byte]) {.async.} =
|
||||
raise newException(ClosedStreamError, "write side is closed")
|
||||
|
||||
method close*(state: ReceiveStream) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method closeWrite*(state: ReceiveStream) {.async.} =
|
||||
discard
|
||||
|
||||
method closeRead*(state: ReceiveStream) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method onClose*(state: ReceiveStream) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
# Wake up pending read() operations before switching states
|
||||
# This fixes race condition when ngtcp2 calls onClose() while read() is waiting
|
||||
try:
|
||||
state.incoming.putNoWait(@[]) # Send EOF marker to wake up pending reads
|
||||
except AsyncQueueFullError:
|
||||
# Queue is full, that's fine - there's already data to process
|
||||
discard
|
||||
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method isClosed*(state: ReceiveStream): bool =
|
||||
false
|
||||
|
||||
method receive*(state: ReceiveStream, offset: uint64, bytes: seq[byte], isFin: bool) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
state.frameSorter.insert(offset, bytes, isFin)
|
||||
|
||||
if state.frameSorter.isComplete():
|
||||
stream.closed.fire()
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method reset*(state: ReceiveStream) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
state.connection.shutdownStream(stream.id)
|
||||
stream.closed.fire()
|
||||
state.frameSorter.reset()
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter, wasReset = true))
|
||||
|
||||
method expire*(state: ReceiveStream) {.raises: [].} =
|
||||
expire(state.stream)
|
||||
69
quic/transport/ngtcp2/stream/sendstream.nim
Normal file
69
quic/transport/ngtcp2/stream/sendstream.nim
Normal file
@@ -0,0 +1,69 @@
|
||||
import ../../../errors
|
||||
import ../../../basics
|
||||
import ../../stream
|
||||
import ../../framesorter
|
||||
import ../native/connection
|
||||
import ./basestream
|
||||
import ./closestream
|
||||
import ./helpers
|
||||
|
||||
type SendStream* = ref object of BaseStream
|
||||
|
||||
proc newSendStream*(
|
||||
connection: Ngtcp2Connection,
|
||||
incoming: AsyncQueue[seq[byte]],
|
||||
frameSorter: FrameSorter,
|
||||
): SendStream =
|
||||
SendStream(connection: connection, incoming: incoming, frameSorter: frameSorter)
|
||||
|
||||
method enter*(state: SendStream, stream: Stream) =
|
||||
procCall enter(StreamState(state), stream)
|
||||
state.stream = Opt.some(stream)
|
||||
setUserData(state.stream, state.connection, unsafeAddr state[])
|
||||
|
||||
method leave*(state: SendStream) =
|
||||
setUserData(state.stream, state.connection, nil)
|
||||
procCall leave(StreamState(state))
|
||||
state.stream = Opt.none(Stream)
|
||||
|
||||
method read*(state: SendStream): Future[seq[byte]] {.async.} =
|
||||
raise newException(ClosedStreamError, "read side is closed")
|
||||
|
||||
method write*(state: SendStream, bytes: seq[byte]) {.async.} =
|
||||
await state.connection.send(state.stream.get.id, bytes)
|
||||
|
||||
method close*(state: SendStream) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
discard state.connection.send(state.stream.get.id, @[], true) # Send FIN
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method closeWrite*(state: SendStream) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
discard state.connection.send(state.stream.get.id, @[], true) # Send FIN
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method closeRead*(stream: SendStream) {.async.} =
|
||||
discard
|
||||
|
||||
method onClose*(state: SendStream) =
|
||||
discard
|
||||
|
||||
method isClosed*(state: SendStream): bool =
|
||||
false
|
||||
|
||||
method receive*(state: SendStream, offset: uint64, bytes: seq[byte], isFin: bool) =
|
||||
discard
|
||||
|
||||
method reset*(state: SendStream) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
state.connection.shutdownStream(stream.id)
|
||||
stream.closed.fire()
|
||||
state.frameSorter.reset()
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter, wasReset = true))
|
||||
|
||||
method expire*(state: SendStream) {.raises: [].} =
|
||||
expire(state.stream)
|
||||
@@ -33,6 +33,9 @@ method close*(state: StreamState) {.base, async.} =
|
||||
method closeWrite*(state: StreamState) {.base, async.} =
|
||||
doAssert false, "override this method"
|
||||
|
||||
method closeRead*(state: StreamState) {.base, async.} =
|
||||
doAssert false, "override this method"
|
||||
|
||||
method reset*(state: StreamState) {.base.} =
|
||||
doAssert false, "override this method"
|
||||
|
||||
@@ -77,6 +80,9 @@ proc close*(stream: Stream) {.async.} =
|
||||
proc closeWrite*(stream: Stream) {.async.} =
|
||||
await stream.state.closeWrite()
|
||||
|
||||
proc closeRead*(stream: Stream) {.async.} =
|
||||
await stream.state.closeRead()
|
||||
|
||||
proc reset*(stream: Stream) =
|
||||
stream.state.reset()
|
||||
|
||||
|
||||
@@ -9,6 +9,12 @@ import pkg/quic/udp/datagram
|
||||
import ../helpers/simulation
|
||||
import ../helpers/contains
|
||||
|
||||
proc newData(size: int, val: uint8 = uint8(0xEE)): seq[uint8] =
|
||||
var data = newSeq[uint8](size)
|
||||
for i in 0 ..< size:
|
||||
data[i] = val
|
||||
return data
|
||||
|
||||
suite "streams":
|
||||
setup:
|
||||
var (client, server) = waitFor performHandshake()
|
||||
@@ -77,8 +83,7 @@ suite "streams":
|
||||
let serverStream = await server.incomingStream()
|
||||
check clientStream.id == serverStream.id
|
||||
|
||||
let incoming = await serverStream.read()
|
||||
check incoming == message
|
||||
check (await serverStream.read()) == message
|
||||
|
||||
await simulation.cancelAndWait()
|
||||
|
||||
@@ -124,9 +129,7 @@ suite "streams":
|
||||
await clientStream.write(message)
|
||||
|
||||
let serverStream = await server.incomingStream()
|
||||
let incoming = await serverStream.read()
|
||||
|
||||
check incoming == message
|
||||
check (await serverStream.read()) == message
|
||||
|
||||
await simulation.cancelAndWait()
|
||||
|
||||
@@ -145,8 +148,7 @@ suite "streams":
|
||||
await sleepAsync(100.milliseconds) # wait for stream to be closed
|
||||
|
||||
# Reading from a closed stream should return EOF, not throw exception
|
||||
let eof = await serverStream.read()
|
||||
check eof.len == 0 # Should return EOF (empty array)
|
||||
check (await serverStream.read()).len == 0
|
||||
|
||||
# In QUIC, receiving FIN doesn't prevent writing back (half-close semantics)
|
||||
# Writing should still work unless the local side is closed
|
||||
@@ -185,23 +187,56 @@ suite "streams":
|
||||
await sleepAsync(100.milliseconds) # wait for stream to be closed
|
||||
|
||||
let serverStream = await server.incomingStream()
|
||||
let incoming = await serverStream.read()
|
||||
check incoming == message
|
||||
check (await serverStream.read()) == message
|
||||
|
||||
await simulation.cancelAndWait()
|
||||
|
||||
asyncTest "closeWrite() prevents further writes":
|
||||
asyncTest "closeWrite() basic test":
|
||||
let simulation = simulateNetwork(client, server)
|
||||
let message = @[1'u8, 2'u8, 3'u8]
|
||||
|
||||
let clientStream = await client.openStream()
|
||||
await clientStream.write(message)
|
||||
await clientStream.write(@[]) # Activate stream
|
||||
let serverStream = await server.incomingStream()
|
||||
|
||||
# client sends data and closes write side
|
||||
await clientStream.write(newData(5))
|
||||
await clientStream.closeWrite()
|
||||
expect ClosedStreamError:
|
||||
await clientStream.write(@[])
|
||||
|
||||
# Writing after closeWrite should fail
|
||||
expect QuicError:
|
||||
await clientStream.write(@[4'u8, 5'u8, 6'u8])
|
||||
check (await serverStream.read()) == newData(5)
|
||||
for i in 0 ..< 10:
|
||||
check (await serverStream.read()).len == 0
|
||||
|
||||
# client can still read
|
||||
await serverStream.write(newData(3))
|
||||
check (await clientStream.read()) == newData(3)
|
||||
|
||||
await serverStream.close()
|
||||
await clientStream.close()
|
||||
await simulation.cancelAndWait()
|
||||
|
||||
asyncTest "closeRead() basic test":
|
||||
let simulation = simulateNetwork(client, server)
|
||||
let clientStream = await client.openStream()
|
||||
await clientStream.write(@[]) # Activate stream
|
||||
let serverStream = await server.incomingStream()
|
||||
|
||||
# closed for read
|
||||
await clientStream.closeRead()
|
||||
expect ClosedStreamError:
|
||||
discard await clientStream.read()
|
||||
|
||||
for i in 0 ..< 10:
|
||||
await serverStream.write(newData(3))
|
||||
expect ClosedStreamError:
|
||||
discard await clientStream.read()
|
||||
|
||||
# open for write
|
||||
await clientStream.write(newData(5))
|
||||
check (await serverStream.read()) == newData(5)
|
||||
|
||||
await serverStream.close()
|
||||
await clientStream.close()
|
||||
await simulation.cancelAndWait()
|
||||
|
||||
asyncTest "closeWrite() sends FIN but allows server to write back":
|
||||
@@ -216,15 +251,13 @@ suite "streams":
|
||||
|
||||
# Server reads client message
|
||||
let serverStream = await server.incomingStream()
|
||||
let incoming = await serverStream.read()
|
||||
check incoming == clientMessage
|
||||
check (await serverStream.read()) == clientMessage
|
||||
|
||||
# Server should still be able to write back
|
||||
await serverStream.write(serverMessage)
|
||||
|
||||
# Client should be able to read server's response
|
||||
let response = await clientStream.read()
|
||||
check response == serverMessage
|
||||
check (await clientStream.read()) == serverMessage
|
||||
|
||||
await simulation.cancelAndWait()
|
||||
|
||||
@@ -269,8 +302,7 @@ suite "streams":
|
||||
await clientStream.closeWrite()
|
||||
|
||||
let serverStream = await server.incomingStream()
|
||||
let received = await serverStream.read()
|
||||
check received == uploadData
|
||||
check (await serverStream.read()) == uploadData
|
||||
|
||||
await simulation.cancelAndWait()
|
||||
|
||||
@@ -302,22 +334,20 @@ suite "streams":
|
||||
await clientStream.closeWrite()
|
||||
|
||||
let serverStream = await server.incomingStream()
|
||||
let received = await serverStream.read()
|
||||
check received == uploadData
|
||||
check (await serverStream.read()) == uploadData
|
||||
|
||||
# Server sends response back
|
||||
var downloadData = @[11'u8, 12'u8, 13'u8, 14'u8, 15'u8]
|
||||
await serverStream.write(downloadData)
|
||||
await serverStream.closeWrite()
|
||||
|
||||
let response = await clientStream.read()
|
||||
check response == downloadData
|
||||
check (await clientStream.read()) == downloadData
|
||||
|
||||
await simulation.cancelAndWait()
|
||||
|
||||
asyncTest "large data transfers with empty write activation work":
|
||||
let simulation = simulateNetwork(client, server)
|
||||
var largeData = newSeq[uint8](1000)
|
||||
var largeData = newData(1000)
|
||||
for i in 0 ..< 1000:
|
||||
largeData[i] = uint8(i mod 256)
|
||||
|
||||
@@ -327,8 +357,7 @@ suite "streams":
|
||||
await clientStream.closeWrite()
|
||||
|
||||
let serverStream = await server.incomingStream()
|
||||
let received = await serverStream.read()
|
||||
check received == largeData
|
||||
check (await serverStream.read()) == largeData
|
||||
|
||||
await simulation.cancelAndWait()
|
||||
|
||||
@@ -349,14 +378,11 @@ suite "streams":
|
||||
var allReceived: seq[uint8]
|
||||
|
||||
# Read all chunks
|
||||
let received1 = await serverStream.read()
|
||||
allReceived.add(received1)
|
||||
allReceived.add(await serverStream.read())
|
||||
|
||||
try:
|
||||
let received2 = await serverStream.read()
|
||||
allReceived.add(received2)
|
||||
let received3 = await serverStream.read()
|
||||
allReceived.add(received3)
|
||||
allReceived.add(await serverStream.read())
|
||||
allReceived.add(await serverStream.read())
|
||||
except:
|
||||
# May receive combined chunks due to TCP-like behavior
|
||||
discard
|
||||
@@ -377,16 +403,12 @@ suite "streams":
|
||||
|
||||
# Server reads data
|
||||
let serverStream = await server.incomingStream()
|
||||
let received = await serverStream.read()
|
||||
check received == testData
|
||||
|
||||
check (await serverStream.read()) == testData
|
||||
# Second read should return EOF (empty array)
|
||||
let eof = await serverStream.read()
|
||||
check eof.len == 0 # EOF should be empty array
|
||||
|
||||
# Third read should also return EOF
|
||||
let eof2 = await serverStream.read()
|
||||
check eof2.len == 0 # Multiple EOF reads should work
|
||||
check (await serverStream.read()).len == 0
|
||||
# Third read should also return EOF (multiple EOF reads should work)
|
||||
check (await serverStream.read()).len == 0
|
||||
|
||||
await simulation.cancelAndWait()
|
||||
|
||||
@@ -402,24 +424,18 @@ suite "streams":
|
||||
|
||||
# Server receives the request
|
||||
let serverStream = await server.incomingStream()
|
||||
let receivedRequest = await serverStream.read()
|
||||
check receivedRequest == request
|
||||
|
||||
check (await serverStream.read()) == request
|
||||
# Server detects end of request (EOF)
|
||||
let requestEof = await serverStream.read()
|
||||
check requestEof.len == 0 # End of request
|
||||
check (await serverStream.read()).len == 0
|
||||
|
||||
# Server processes and sends response (can still write back!)
|
||||
await serverStream.write(response)
|
||||
await serverStream.close() # Server finishes and closes completely
|
||||
|
||||
# Client reads the response
|
||||
let receivedResponse = await clientStream.read()
|
||||
check receivedResponse == response
|
||||
|
||||
check (await clientStream.read()) == response
|
||||
# Client detects end of response
|
||||
let responseEof = await clientStream.read()
|
||||
check responseEof.len == 0 # End of response
|
||||
check (await clientStream.read()).len == 0
|
||||
|
||||
# At this point both sides have received what they need
|
||||
# Client can't write (closeWrite called), server can't write (close called)
|
||||
@@ -451,11 +467,8 @@ suite "streams":
|
||||
await clientStream.write(@[6'u8, 7, 8])
|
||||
|
||||
# Server should receive data and EOF
|
||||
let receivedData = await serverStream.read()
|
||||
check receivedData == clientData
|
||||
|
||||
let eof = await serverStream.read()
|
||||
check eof.len == 0 # EOF
|
||||
check (await serverStream.read()) == clientData
|
||||
check (await serverStream.read()).len == 0 # EOF
|
||||
|
||||
# Server can still write back (until it receives indication that client closed read)
|
||||
# But in QUIC when close() is called, it closes ALL directions
|
||||
@@ -474,7 +487,6 @@ suite "streams":
|
||||
# Both send data
|
||||
let clientData = @[1'u8, 2, 3]
|
||||
let serverData = @[4'u8, 5, 6]
|
||||
|
||||
await clientStream.write(clientData)
|
||||
await serverStream.write(serverData)
|
||||
|
||||
@@ -489,18 +501,11 @@ suite "streams":
|
||||
await serverStream.write(@[8'u8])
|
||||
|
||||
# But both can read each other's data
|
||||
let receivedByClient = await clientStream.read()
|
||||
let receivedByServer = await serverStream.read()
|
||||
|
||||
check receivedByClient == serverData
|
||||
check receivedByServer == clientData
|
||||
|
||||
check (await clientStream.read()) == serverData
|
||||
check (await serverStream.read()) == clientData
|
||||
# And both receive EOF
|
||||
let eofClient = await clientStream.read()
|
||||
let eofServer = await serverStream.read()
|
||||
|
||||
check eofClient.len == 0
|
||||
check eofServer.len == 0
|
||||
check (await clientStream.read()).len == 0
|
||||
check (await serverStream.read()).len == 0
|
||||
|
||||
await simulation.cancelAndWait()
|
||||
|
||||
@@ -521,8 +526,7 @@ suite "streams":
|
||||
await serverStream.write(serverData)
|
||||
|
||||
# Client reads response
|
||||
let response = await clientStream.read()
|
||||
check response == serverData
|
||||
check (await clientStream.read()) == serverData
|
||||
|
||||
# Now client fully closes stream
|
||||
await clientStream.close()
|
||||
@@ -550,12 +554,9 @@ suite "streams":
|
||||
await serverStream.close()
|
||||
|
||||
# Client should receive data from server
|
||||
let receivedByClient = await clientStream.read()
|
||||
check receivedByClient == serverData
|
||||
|
||||
check (await clientStream.read()) == serverData
|
||||
# And EOF
|
||||
let eofClient = await clientStream.read()
|
||||
check eofClient.len == 0
|
||||
check (await clientStream.read()).len == 0
|
||||
|
||||
# Server should also receive data from client (before its close())
|
||||
let receivedByServer = await serverStream.read()
|
||||
@@ -594,19 +595,11 @@ suite "streams":
|
||||
asyncTest "simple 10MB write test":
|
||||
let simulation = simulateNetwork(client, server)
|
||||
let dataSize = 10 * 1024 * 1024 # 10 MB
|
||||
var testData = newSeq[uint8](dataSize)
|
||||
|
||||
# Fill with pattern
|
||||
for i in 0 ..< dataSize:
|
||||
testData[i] = uint8(i mod 256)
|
||||
var testData = newData(dataSize, uint8(0xAA))
|
||||
|
||||
let clientStream = await client.openStream()
|
||||
let serverStreamFuture = server.incomingStream()
|
||||
|
||||
# Activate stream
|
||||
await clientStream.write(@[])
|
||||
|
||||
let serverStream = await serverStreamFuture
|
||||
await clientStream.write(@[]) # Activate stream
|
||||
let serverStream = await server.incomingStream()
|
||||
|
||||
# Server starts reading IMMEDIATELY (parallel with client writing)
|
||||
proc serverReadData(): Future[seq[uint8]] {.async.} =
|
||||
@@ -639,24 +632,12 @@ suite "streams":
|
||||
asyncTest "bidirectional 10MB + 10MB closeWrite test":
|
||||
let simulation = simulateNetwork(client, server)
|
||||
let dataSize = 10 * 1024 * 1024 # 10 MB each direction
|
||||
|
||||
# Client data pattern
|
||||
var clientData = newSeq[uint8](dataSize)
|
||||
for i in 0 ..< dataSize:
|
||||
clientData[i] = uint8(0xAA) # Client pattern
|
||||
|
||||
# Server data pattern
|
||||
var serverData = newSeq[uint8](dataSize)
|
||||
for i in 0 ..< dataSize:
|
||||
serverData[i] = uint8(0xBB) # Server pattern
|
||||
var clientData = newData(dataSize, uint8(0xAA))
|
||||
var serverData = newData(dataSize, uint8(0xBB))
|
||||
|
||||
let clientStream = await client.openStream()
|
||||
let serverStreamFuture = server.incomingStream()
|
||||
|
||||
# Activate stream
|
||||
await clientStream.write(@[])
|
||||
|
||||
let serverStream = await serverStreamFuture
|
||||
await clientStream.write(@[]) # Activate stream
|
||||
let serverStream = await server.incomingStream()
|
||||
|
||||
# Start parallel read operations for both directions
|
||||
proc clientReadData(): Future[seq[uint8]] {.async.} =
|
||||
@@ -719,10 +700,8 @@ suite "streams":
|
||||
check serverDataValid
|
||||
|
||||
# Both sides should be able to detect EOF now
|
||||
let clientEOF = await clientStream.read()
|
||||
let serverEOF = await serverStream.read()
|
||||
check clientEOF.len == 0
|
||||
check serverEOF.len == 0
|
||||
check (await clientStream.read()).len == 0
|
||||
check (await serverStream.read()).len == 0
|
||||
|
||||
await serverStream.close()
|
||||
await clientStream.close()
|
||||
@@ -732,23 +711,12 @@ suite "streams":
|
||||
let simulation = simulateNetwork(client, server)
|
||||
let dataSize = 10 * 1024 * 1024 # 10 MB
|
||||
|
||||
# Client data pattern
|
||||
var clientData = newSeq[uint8](dataSize)
|
||||
for i in 0 ..< dataSize:
|
||||
clientData[i] = uint8(0xCC) # Client pattern
|
||||
|
||||
# Server data pattern
|
||||
var serverData = newSeq[uint8](dataSize)
|
||||
for i in 0 ..< dataSize:
|
||||
serverData[i] = uint8(0xDD) # Server pattern
|
||||
var clientData = newData(dataSize, uint8(0xCC))
|
||||
var serverData = newData(dataSize, uint8(0xDD))
|
||||
|
||||
let clientStream = await client.openStream()
|
||||
let serverStreamFuture = server.incomingStream()
|
||||
|
||||
# Activate stream
|
||||
await clientStream.write(@[])
|
||||
|
||||
let serverStream = await serverStreamFuture
|
||||
await clientStream.write(@[]) # Activate stream
|
||||
let serverStream = await server.incomingStream()
|
||||
|
||||
# Start parallel read operations
|
||||
proc clientReadData(): Future[seq[uint8]] {.async.} =
|
||||
@@ -811,8 +779,7 @@ suite "streams":
|
||||
check serverDataValid
|
||||
|
||||
# Client should get EOF when trying to read (server did full close)
|
||||
let clientEOF = await clientStream.read()
|
||||
check clientEOF.len == 0
|
||||
check (await clientStream.read()).len == 0
|
||||
|
||||
# Client should still be able to close its read side
|
||||
await clientStream.close()
|
||||
@@ -822,18 +789,11 @@ suite "streams":
|
||||
asyncTest "reverse order: client starts writing first, server reads parallel":
|
||||
let simulation = simulateNetwork(client, server)
|
||||
let dataSize = 10 * 1024 * 1024 # 10 MB
|
||||
|
||||
var testData = newSeq[uint8](dataSize)
|
||||
for i in 0 ..< dataSize:
|
||||
testData[i] = uint8(0xEE) # Pattern for this test
|
||||
var testData = newData(dataSize, uint8(0xEE))
|
||||
|
||||
let clientStream = await client.openStream()
|
||||
let serverStreamFuture = server.incomingStream()
|
||||
|
||||
# Activate stream
|
||||
await clientStream.write(@[])
|
||||
|
||||
let serverStream = await serverStreamFuture
|
||||
await clientStream.write(@[]) # Activate stream
|
||||
let serverStream = await server.incomingStream()
|
||||
|
||||
# CLIENT STARTS WRITING FIRST (non-blocking)
|
||||
let clientWriteTask = proc() {.async.} =
|
||||
@@ -874,10 +834,7 @@ suite "streams":
|
||||
break
|
||||
|
||||
check dataValid
|
||||
|
||||
# EOF check
|
||||
let eofCheck = await serverStream.read()
|
||||
check eofCheck.len == 0
|
||||
check (await serverStream.read()).len == 0
|
||||
|
||||
await serverStream.close()
|
||||
await clientStream.close()
|
||||
|
||||
Reference in New Issue
Block a user