mirror of
https://github.com/vacp2p/nim-quic.git
synced 2026-01-08 21:38:05 -05:00
stream states improvements (#107)
This commit is contained in:
7
.gitignore
vendored
7
.gitignore
vendored
@@ -3,4 +3,9 @@
|
||||
# Second rule is here to un-ignores all files with extension,
|
||||
# because it appears that vs code is skipping text search is some tests files without these rules.
|
||||
tests/**/test*[^.]*
|
||||
!tests/**/*.*
|
||||
!tests/**/*.*
|
||||
|
||||
# nimble local setup
|
||||
nimble.develop
|
||||
nimble.paths
|
||||
nimbledeps
|
||||
|
||||
@@ -112,8 +112,7 @@ proc dial*(
|
||||
)
|
||||
var connection: Connection
|
||||
proc onReceive(udp: DatagramTransport, remote: TransportAddress) {.async.} =
|
||||
let datagram = Datagram(data: udp.getMessage())
|
||||
connection.receive(datagram)
|
||||
connection.receive(Datagram(data: udp.getMessage()))
|
||||
|
||||
let udp = newDatagramTransport(onReceive)
|
||||
connection = newOutgoingConnection(tlsBackend, udp, address, self.rng)
|
||||
|
||||
@@ -2,20 +2,27 @@ import ../errors
|
||||
import std/tables
|
||||
import chronos
|
||||
|
||||
type FrameSorter* = object
|
||||
type FrameSorter* = ref object of RootRef
|
||||
buffer*: Table[int64, byte] # sparse byte storage
|
||||
emitPos*: int64 # where to emit data from
|
||||
incoming*: AsyncQueue[seq[byte]]
|
||||
totalBytes*: Opt[int64]
|
||||
# contains total bytes for frame; and is known once a FIN is received
|
||||
closed: bool
|
||||
|
||||
proc initFrameSorter*(incoming: AsyncQueue[seq[byte]]): FrameSorter =
|
||||
result.incoming = incoming
|
||||
result.buffer = initTable[int64, byte]()
|
||||
result.emitPos = 0
|
||||
result.totalBytes = Opt.none(int64)
|
||||
return FrameSorter(
|
||||
incoming: incoming,
|
||||
buffer: initTable[int64, byte](),
|
||||
emitPos: 0,
|
||||
totalBytes: Opt.none(int64),
|
||||
closed: false,
|
||||
)
|
||||
|
||||
proc isEOF*(fs: FrameSorter): bool =
|
||||
if fs.closed:
|
||||
return true
|
||||
|
||||
if fs.totalBytes.isNone:
|
||||
return false
|
||||
|
||||
@@ -50,9 +57,18 @@ proc emitBufferedData(fs: var FrameSorter) {.raises: [QuicError].} =
|
||||
|
||||
fs.putToQueue(emitData)
|
||||
|
||||
proc close*(fs: var FrameSorter) =
|
||||
if fs.closed:
|
||||
return
|
||||
fs.closed = true
|
||||
fs.sendEof()
|
||||
|
||||
proc insert*(
|
||||
fs: var FrameSorter, offset: uint64, data: seq[byte], isFin: bool
|
||||
) {.raises: [QuicError].} =
|
||||
if fs.closed:
|
||||
return
|
||||
|
||||
if isFin:
|
||||
fs.totalBytes = Opt.some(offset.int64 + max(data.len - 1, 0))
|
||||
defer:
|
||||
@@ -99,6 +115,9 @@ proc reset*(fs: var FrameSorter) =
|
||||
fs.emitPos = 0
|
||||
|
||||
proc isComplete*(fs: FrameSorter): bool =
|
||||
if fs.closed:
|
||||
return true
|
||||
|
||||
if fs.totalBytes.isNone:
|
||||
return false
|
||||
|
||||
|
||||
@@ -114,10 +114,9 @@ method receive(state: OpenConnection, datagram: Datagram) =
|
||||
errMsg = exc.msg
|
||||
trace "ngtcp2 error on receive", code = errCode, msg = errMsg
|
||||
finally:
|
||||
var isDraining = state.ngtcp2Connection.isDraining
|
||||
let quicConnection = state.quicConnection.valueOr:
|
||||
return
|
||||
if isDraining:
|
||||
if state.ngtcp2Connection.isDraining:
|
||||
let ids = state.ids
|
||||
let duration = state.ngtcp2Connection.closingDuration()
|
||||
let draining = newDrainingConnection(ids, duration, state.derCertificates)
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
import ngtcp2
|
||||
import ../../../helpers/openarray
|
||||
import ../../stream
|
||||
import ../stream/openstream
|
||||
import ../streamstate/openstate
|
||||
import ./connection
|
||||
import chronicles
|
||||
|
||||
proc newStream(connection: Ngtcp2Connection, id: int64): Stream =
|
||||
newStream(id, newOpenStream(connection))
|
||||
newStream(id, newOpenStreamState(connection))
|
||||
|
||||
proc openStream*(connection: Ngtcp2Connection, unidirectional: bool): Stream =
|
||||
var id: int64
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
import ../../../basics
|
||||
import ../../framesorter
|
||||
import ../../stream
|
||||
import ../native/connection
|
||||
|
||||
type BaseStream* = ref object of StreamState
|
||||
stream*: Opt[Stream]
|
||||
incoming*: AsyncQueue[seq[byte]]
|
||||
connection*: Ngtcp2Connection
|
||||
frameSorter*: FrameSorter
|
||||
@@ -1,60 +0,0 @@
|
||||
import ../../../errors
|
||||
import ../../../basics
|
||||
import ../../stream
|
||||
import ../../framesorter
|
||||
import ./basestream
|
||||
import ./helpers
|
||||
|
||||
type ClosedStream* = ref object of BaseStream
|
||||
wasReset: bool
|
||||
|
||||
proc newClosedStream*(
|
||||
incoming: AsyncQueue[seq[byte]], frameSorter: FrameSorter, wasReset: bool = false
|
||||
): ClosedStream =
|
||||
ClosedStream(incoming: incoming, wasReset: wasReset)
|
||||
|
||||
method enter*(state: ClosedStream, stream: Stream) =
|
||||
setUserData(state.stream, state.connection, nil)
|
||||
|
||||
method leave*(state: ClosedStream) =
|
||||
discard
|
||||
|
||||
method read*(state: ClosedStream): Future[seq[byte]] {.async.} =
|
||||
# If stream was reset, always throw exception
|
||||
if state.wasReset:
|
||||
raise newException(ClosedStreamError, "stream was reset")
|
||||
|
||||
try:
|
||||
return state.incoming.popFirstNoWait()
|
||||
except AsyncQueueEmptyError:
|
||||
discard
|
||||
|
||||
# When no more data is available, return EOF instead of throwing exception
|
||||
return @[]
|
||||
|
||||
method write*(state: ClosedStream, bytes: seq[byte]) {.async.} =
|
||||
raise newException(ClosedStreamError, "stream is closed")
|
||||
|
||||
method close*(state: ClosedStream) {.async.} =
|
||||
discard
|
||||
|
||||
method closeWrite*(state: ClosedStream) {.async.} =
|
||||
discard
|
||||
|
||||
method closeRead*(state: ClosedStream) {.async.} =
|
||||
discard
|
||||
|
||||
method onClose*(state: ClosedStream) =
|
||||
discard
|
||||
|
||||
method isClosed*(state: ClosedStream): bool =
|
||||
true
|
||||
|
||||
method receive*(state: ClosedStream, offset: uint64, bytes: seq[byte], isFin: bool) =
|
||||
discard
|
||||
|
||||
method reset*(state: ClosedStream) =
|
||||
discard
|
||||
|
||||
method expire*(state: ClosedStream) {.raises: [].} =
|
||||
discard
|
||||
@@ -1,24 +0,0 @@
|
||||
import chronicles
|
||||
import ../../../basics
|
||||
import ../../stream
|
||||
import ../native/connection
|
||||
|
||||
proc allowMoreIncomingBytes*(
|
||||
stream: Opt[stream.Stream], connection: Ngtcp2Connection, amount: uint64
|
||||
) =
|
||||
let stream = stream.valueOr:
|
||||
return
|
||||
connection.extendStreamOffset(stream.id, amount)
|
||||
connection.send()
|
||||
|
||||
proc setUserData*(
|
||||
stream: Opt[stream.Stream], connection: Ngtcp2Connection, userdata: pointer
|
||||
) =
|
||||
let stream = stream.valueOr:
|
||||
return
|
||||
connection.setStreamUserData(stream.id, userdata)
|
||||
|
||||
proc expire*(stream: Opt[stream.Stream]) =
|
||||
let stream = stream.valueOr:
|
||||
return
|
||||
stream.closed.fire()
|
||||
@@ -1,105 +0,0 @@
|
||||
import ../../../basics
|
||||
import ../../framesorter
|
||||
import ../../stream
|
||||
import ../native/connection
|
||||
import ./basestream
|
||||
import ./closestream
|
||||
import ./receivestream
|
||||
import ./sendstream
|
||||
import ./helpers
|
||||
|
||||
type OpenStream* = ref object of BaseStream
|
||||
|
||||
proc newOpenStream*(connection: Ngtcp2Connection): OpenStream =
|
||||
let incomingQ = newAsyncQueue[seq[byte]]()
|
||||
OpenStream(
|
||||
connection: connection, incoming: incomingQ, frameSorter: initFrameSorter(incomingQ)
|
||||
)
|
||||
|
||||
method enter*(state: OpenStream, stream: Stream) =
|
||||
procCall enter(StreamState(state), stream)
|
||||
state.stream = Opt.some(stream)
|
||||
setUserData(state.stream, state.connection, unsafeAddr state[])
|
||||
|
||||
method leave*(state: OpenStream) =
|
||||
procCall leave(StreamState(state))
|
||||
state.stream = Opt.none(Stream)
|
||||
|
||||
method read*(state: OpenStream): Future[seq[byte]] {.async.} =
|
||||
# Check for immediate EOF conditions
|
||||
if state.frameSorter.isEOF() and state.incoming.len == 0:
|
||||
return @[] # Return EOF immediately per RFC 9000 "Data Read" state
|
||||
|
||||
# Get data from incoming queue
|
||||
let data = await state.incoming.get()
|
||||
|
||||
# If we got real data, return it with flow control update
|
||||
if data.len > 0:
|
||||
allowMoreIncomingBytes(state.stream, state.connection, data.len.uint64)
|
||||
return data
|
||||
|
||||
# If we got empty data (len == 0), check if this is EOF
|
||||
if data.len == 0 and state.frameSorter.isEOF():
|
||||
return @[] # Return EOF per RFC 9000
|
||||
|
||||
# Empty data but no EOF; continue reading for more data
|
||||
return await state.read()
|
||||
|
||||
method write*(state: OpenStream, bytes: seq[byte]): Future[void] =
|
||||
state.connection.send(state.stream.get.id, bytes)
|
||||
|
||||
method close*(state: OpenStream) {.async.} =
|
||||
# Bidirectional streams, close() only closes the send side of the stream.
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
discard state.connection.send(state.stream.get.id, @[], true) # Send FIN
|
||||
stream.switch(newReceiveStream(state.connection, state.incoming, state.frameSorter))
|
||||
|
||||
method closeWrite*(state: OpenStream) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
discard state.connection.send(state.stream.get.id, @[], true) # Send FIN
|
||||
stream.switch(newReceiveStream(state.connection, state.incoming, state.frameSorter))
|
||||
|
||||
method closeRead*(state: OpenStream) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.switch(newSendStream(state.connection, state.incoming, state.frameSorter))
|
||||
|
||||
method onClose*(state: OpenStream) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
# Wake up pending read() operations before switching states
|
||||
# This fixes race condition when ngtcp2 calls onClose() while read() is waiting
|
||||
try:
|
||||
state.incoming.putNoWait(@[]) # Send EOF marker to wake up pending reads
|
||||
except AsyncQueueFullError:
|
||||
# Queue is full, that's fine - there's already data to process
|
||||
discard
|
||||
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method isClosed*(state: OpenStream): bool =
|
||||
false
|
||||
|
||||
method receive*(state: OpenStream, offset: uint64, bytes: seq[byte], isFin: bool) =
|
||||
state.frameSorter.insert(offset, bytes, isFin)
|
||||
|
||||
if state.frameSorter.isComplete():
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.closed.fire()
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method reset*(state: OpenStream) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
state.connection.shutdownStream(stream.id)
|
||||
stream.closed.fire()
|
||||
state.frameSorter.reset()
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter, wasReset = true))
|
||||
|
||||
method expire*(state: OpenStream) {.raises: [].} =
|
||||
expire(state.stream)
|
||||
@@ -1,108 +0,0 @@
|
||||
import ../../../errors
|
||||
import ../../../basics
|
||||
import ../../stream
|
||||
import ../../framesorter
|
||||
import ../native/connection
|
||||
import ./basestream
|
||||
import ./closestream
|
||||
import ./helpers
|
||||
|
||||
type ReceiveStream* = ref object of BaseStream
|
||||
|
||||
proc newReceiveStream*(
|
||||
connection: Ngtcp2Connection,
|
||||
incoming: AsyncQueue[seq[byte]],
|
||||
frameSorter: FrameSorter,
|
||||
): ReceiveStream =
|
||||
ReceiveStream(connection: connection, incoming: incoming, frameSorter: frameSorter)
|
||||
|
||||
method enter*(state: ReceiveStream, stream: Stream) =
|
||||
procCall enter(StreamState(state), stream)
|
||||
state.stream = Opt.some(stream)
|
||||
setUserData(state.stream, state.connection, unsafeAddr state[])
|
||||
|
||||
method leave*(state: ReceiveStream) =
|
||||
procCall leave(StreamState(state))
|
||||
state.stream = Opt.none(Stream)
|
||||
|
||||
method read*(state: ReceiveStream): Future[seq[byte]] {.async.} =
|
||||
# Check for immediate EOF conditions
|
||||
if state.frameSorter.isEOF() and state.incoming.len == 0:
|
||||
let stream = state.stream.valueOr:
|
||||
return @[] # Already closed
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
return @[] # Return EOF immediately per RFC 9000 "Data Read" state
|
||||
|
||||
# Get data from incoming queue
|
||||
let data = await state.incoming.get()
|
||||
|
||||
# If we got real data, return it with flow control update
|
||||
if data.len > 0:
|
||||
allowMoreIncomingBytes(state.stream, state.connection, data.len.uint64)
|
||||
return data
|
||||
|
||||
# If we got empty data (len == 0), check if this is EOF
|
||||
if data.len == 0 and state.frameSorter.isEOF():
|
||||
# This is EOF - stream has been closed with FIN bit from remote
|
||||
let stream = state.stream.valueOr:
|
||||
return @[] # Already closed
|
||||
# If local read is also closed, switch to ClosedStream
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
return @[] # Return EOF per RFC 9000
|
||||
|
||||
# Empty data but no EOF; continue reading for more data
|
||||
return await state.read()
|
||||
|
||||
method write*(state: ReceiveStream, bytes: seq[byte]) {.async.} =
|
||||
raise newException(ClosedStreamError, "write side is closed")
|
||||
|
||||
method close*(state: ReceiveStream) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method closeWrite*(state: ReceiveStream) {.async.} =
|
||||
discard
|
||||
|
||||
method closeRead*(state: ReceiveStream) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method onClose*(state: ReceiveStream) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
# Wake up pending read() operations before switching states
|
||||
# This fixes race condition when ngtcp2 calls onClose() while read() is waiting
|
||||
try:
|
||||
state.incoming.putNoWait(@[]) # Send EOF marker to wake up pending reads
|
||||
except AsyncQueueFullError:
|
||||
# Queue is full, that's fine - there's already data to process
|
||||
discard
|
||||
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method isClosed*(state: ReceiveStream): bool =
|
||||
false
|
||||
|
||||
method receive*(state: ReceiveStream, offset: uint64, bytes: seq[byte], isFin: bool) =
|
||||
state.frameSorter.insert(offset, bytes, isFin)
|
||||
|
||||
if state.frameSorter.isComplete():
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.closed.fire()
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method reset*(state: ReceiveStream) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
state.connection.shutdownStream(stream.id)
|
||||
stream.closed.fire()
|
||||
state.frameSorter.reset()
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter, wasReset = true))
|
||||
|
||||
method expire*(state: ReceiveStream) {.raises: [].} =
|
||||
expire(state.stream)
|
||||
@@ -1,68 +0,0 @@
|
||||
import ../../../errors
|
||||
import ../../../basics
|
||||
import ../../stream
|
||||
import ../../framesorter
|
||||
import ../native/connection
|
||||
import ./basestream
|
||||
import ./closestream
|
||||
import ./helpers
|
||||
|
||||
type SendStream* = ref object of BaseStream
|
||||
|
||||
proc newSendStream*(
|
||||
connection: Ngtcp2Connection,
|
||||
incoming: AsyncQueue[seq[byte]],
|
||||
frameSorter: FrameSorter,
|
||||
): SendStream =
|
||||
SendStream(connection: connection, incoming: incoming, frameSorter: frameSorter)
|
||||
|
||||
method enter*(state: SendStream, stream: Stream) =
|
||||
procCall enter(StreamState(state), stream)
|
||||
state.stream = Opt.some(stream)
|
||||
setUserData(state.stream, state.connection, unsafeAddr state[])
|
||||
|
||||
method leave*(state: SendStream) =
|
||||
procCall leave(StreamState(state))
|
||||
state.stream = Opt.none(Stream)
|
||||
|
||||
method read*(state: SendStream): Future[seq[byte]] {.async.} =
|
||||
raise newException(ClosedStreamError, "read side is closed")
|
||||
|
||||
method write*(state: SendStream, bytes: seq[byte]) {.async.} =
|
||||
await state.connection.send(state.stream.get.id, bytes)
|
||||
|
||||
method close*(state: SendStream) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
discard state.connection.send(state.stream.get.id, @[], true) # Send FIN
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method closeWrite*(state: SendStream) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
discard state.connection.send(state.stream.get.id, @[], true) # Send FIN
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method closeRead*(stream: SendStream) {.async.} =
|
||||
discard
|
||||
|
||||
method onClose*(state: SendStream) =
|
||||
discard
|
||||
|
||||
method isClosed*(state: SendStream): bool =
|
||||
false
|
||||
|
||||
method receive*(state: SendStream, offset: uint64, bytes: seq[byte], isFin: bool) =
|
||||
discard
|
||||
|
||||
method reset*(state: SendStream) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
state.connection.shutdownStream(stream.id)
|
||||
stream.closed.fire()
|
||||
state.frameSorter.reset()
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter, wasReset = true))
|
||||
|
||||
method expire*(state: SendStream) {.raises: [].} =
|
||||
expire(state.stream)
|
||||
24
quic/transport/ngtcp2/streamstate/basestate.nim
Normal file
24
quic/transport/ngtcp2/streamstate/basestate.nim
Normal file
@@ -0,0 +1,24 @@
|
||||
import ../../../basics
|
||||
import ../../framesorter
|
||||
import ../../stream
|
||||
import ../native/connection
|
||||
|
||||
type BaseStreamState* = ref object of StreamState
|
||||
stream*: Opt[Stream]
|
||||
incoming*: AsyncQueue[seq[byte]]
|
||||
connection*: Ngtcp2Connection
|
||||
frameSorter*: FrameSorter
|
||||
|
||||
proc setUserData*(state: BaseStreamState, stream: stream.Stream) =
|
||||
state.connection.setStreamUserData(stream.id, unsafeAddr state[])
|
||||
|
||||
method expire*(state: BaseStreamState) {.raises: [].} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.closed.fire()
|
||||
|
||||
proc allowMoreIncomingBytes*(state: BaseStreamState, amount: uint64) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
state.connection.extendStreamOffset(stream.id, amount)
|
||||
state.connection.send()
|
||||
67
quic/transport/ngtcp2/streamstate/closestate.nim
Normal file
67
quic/transport/ngtcp2/streamstate/closestate.nim
Normal file
@@ -0,0 +1,67 @@
|
||||
import ../../../errors
|
||||
import ../../../basics
|
||||
import ../../stream
|
||||
import ../../framesorter
|
||||
import ../native/connection
|
||||
import ./basestate
|
||||
|
||||
type ClosedStreamState* = ref object of BaseStreamState
|
||||
wasReset: bool
|
||||
|
||||
proc newClosedStreamState*(
|
||||
base: BaseStreamState, wasReset: bool = false
|
||||
): ClosedStreamState =
|
||||
ClosedStreamState(
|
||||
connection: base.connection,
|
||||
incoming: base.incoming,
|
||||
frameSorter: base.frameSorter,
|
||||
wasReset: wasReset,
|
||||
)
|
||||
|
||||
method enter*(state: ClosedStreamState, stream: Stream) =
|
||||
procCall enter(StreamState(state), stream)
|
||||
state.stream = Opt.some(stream)
|
||||
state.setUserData(stream)
|
||||
state.frameSorter.close()
|
||||
|
||||
method leave*(state: ClosedStreamState) =
|
||||
doAssert false, "ClosedStreamState state should never leave"
|
||||
|
||||
method read*(state: ClosedStreamState): Future[seq[byte]] {.async.} =
|
||||
# If stream was reset, always throw exception
|
||||
if state.wasReset:
|
||||
raise newException(ClosedStreamError, "stream was reset")
|
||||
|
||||
try:
|
||||
return state.incoming.popFirstNoWait()
|
||||
except AsyncQueueEmptyError:
|
||||
discard
|
||||
|
||||
# When no more data is available, return EOF instead of throwing exception
|
||||
return @[]
|
||||
|
||||
method write*(state: ClosedStreamState, bytes: seq[byte]) {.async.} =
|
||||
raise newException(ClosedStreamError, "stream is closed")
|
||||
|
||||
method close*(state: ClosedStreamState) {.async.} =
|
||||
discard
|
||||
|
||||
method closeWrite*(state: ClosedStreamState) {.async.} =
|
||||
discard
|
||||
|
||||
method closeRead*(state: ClosedStreamState) {.async.} =
|
||||
discard
|
||||
|
||||
method onClose*(state: ClosedStreamState) =
|
||||
discard
|
||||
|
||||
method isClosed*(state: ClosedStreamState): bool =
|
||||
true
|
||||
|
||||
method receive*(
|
||||
state: ClosedStreamState, offset: uint64, bytes: seq[byte], isFin: bool
|
||||
) =
|
||||
discard
|
||||
|
||||
method reset*(state: ClosedStreamState) =
|
||||
discard
|
||||
92
quic/transport/ngtcp2/streamstate/openstate.nim
Normal file
92
quic/transport/ngtcp2/streamstate/openstate.nim
Normal file
@@ -0,0 +1,92 @@
|
||||
import ../../../basics
|
||||
import ../../framesorter
|
||||
import ../../stream
|
||||
import ../native/connection
|
||||
import ./basestate
|
||||
import ./closestate
|
||||
import ./receivestate
|
||||
import ./sendstate
|
||||
|
||||
type OpenStreamState* = ref object of BaseStreamState
|
||||
|
||||
proc newOpenStreamState*(connection: Ngtcp2Connection): OpenStreamState =
|
||||
let incomingQ = newAsyncQueue[seq[byte]]()
|
||||
OpenStreamState(
|
||||
connection: connection, incoming: incomingQ, frameSorter: initFrameSorter(incomingQ)
|
||||
)
|
||||
|
||||
method enter*(state: OpenStreamState, stream: Stream) =
|
||||
procCall enter(StreamState(state), stream)
|
||||
state.stream = Opt.some(stream)
|
||||
state.setUserData(stream)
|
||||
|
||||
method leave*(state: OpenStreamState) =
|
||||
procCall leave(StreamState(state))
|
||||
state.stream = Opt.none(Stream)
|
||||
|
||||
method read*(state: OpenStreamState): Future[seq[byte]] {.async.} =
|
||||
# Check for immediate EOF conditions
|
||||
if state.frameSorter.isEOF() and state.incoming.len == 0:
|
||||
return @[] # Return EOF immediately per RFC 9000 "Data Read" state
|
||||
|
||||
# Get data from incoming queue
|
||||
let data = await state.incoming.get()
|
||||
|
||||
# If we got real data, return it with flow control update
|
||||
if data.len > 0:
|
||||
state.allowMoreIncomingBytes(data.len.uint64)
|
||||
return data
|
||||
|
||||
# If we got empty data (len == 0), check if this is EOF
|
||||
if data.len == 0 and state.frameSorter.isEOF():
|
||||
return @[] # Return EOF per RFC 9000
|
||||
|
||||
# Empty data but no EOF; continue reading for more data
|
||||
return await state.read()
|
||||
|
||||
method write*(state: OpenStreamState, bytes: seq[byte]): Future[void] =
|
||||
state.connection.send(state.stream.get.id, bytes)
|
||||
|
||||
method close*(state: OpenStreamState) {.async.} =
|
||||
# Bidirectional streams, close() only closes the send side of the stream.
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
discard state.connection.send(state.stream.get.id, @[], true) # Send FIN
|
||||
stream.switch(newReceiveStreamState(state))
|
||||
|
||||
method closeWrite*(state: OpenStreamState) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
discard state.connection.send(state.stream.get.id, @[], true) # Send FIN
|
||||
stream.switch(newReceiveStreamState(state))
|
||||
|
||||
method closeRead*(state: OpenStreamState) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.switch(newSendStreamState(state))
|
||||
|
||||
method onClose*(state: OpenStreamState) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.switch(newClosedStreamState(state))
|
||||
|
||||
method isClosed*(state: OpenStreamState): bool =
|
||||
false
|
||||
|
||||
method receive*(state: OpenStreamState, offset: uint64, bytes: seq[byte], isFin: bool) =
|
||||
state.frameSorter.insert(offset, bytes, isFin)
|
||||
|
||||
if state.frameSorter.isComplete():
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.closed.fire()
|
||||
stream.switch(newClosedStreamState(state))
|
||||
|
||||
method reset*(state: OpenStreamState) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
state.connection.shutdownStream(stream.id)
|
||||
stream.closed.fire()
|
||||
state.frameSorter.reset()
|
||||
stream.switch(newClosedStreamState(state, wasReset = true))
|
||||
95
quic/transport/ngtcp2/streamstate/receivestate.nim
Normal file
95
quic/transport/ngtcp2/streamstate/receivestate.nim
Normal file
@@ -0,0 +1,95 @@
|
||||
import ../../../errors
|
||||
import ../../../basics
|
||||
import ../../stream
|
||||
import ../../framesorter
|
||||
import ../native/connection
|
||||
import ./basestate
|
||||
import ./closestate
|
||||
|
||||
type ReceiveStreamState* = ref object of BaseStreamState
|
||||
|
||||
proc newReceiveStreamState*(base: BaseStreamState): ReceiveStreamState =
|
||||
ReceiveStreamState(
|
||||
connection: base.connection, incoming: base.incoming, frameSorter: base.frameSorter
|
||||
)
|
||||
|
||||
method enter*(state: ReceiveStreamState, stream: Stream) =
|
||||
procCall enter(StreamState(state), stream)
|
||||
state.stream = Opt.some(stream)
|
||||
state.setUserData(stream)
|
||||
|
||||
method leave*(state: ReceiveStreamState) =
|
||||
procCall leave(StreamState(state))
|
||||
state.stream = Opt.none(Stream)
|
||||
|
||||
method read*(state: ReceiveStreamState): Future[seq[byte]] {.async.} =
|
||||
# Check for immediate EOF conditions
|
||||
if state.frameSorter.isEOF() and state.incoming.len == 0:
|
||||
let stream = state.stream.valueOr:
|
||||
return @[] # Already closed
|
||||
stream.switch(newClosedStreamState(state))
|
||||
return @[] # Return EOF immediately per RFC 9000 "Data Read" state
|
||||
|
||||
# Get data from incoming queue
|
||||
let data = await state.incoming.get()
|
||||
|
||||
# If we got real data, return it with flow control update
|
||||
if data.len > 0:
|
||||
state.allowMoreIncomingBytes(data.len.uint64)
|
||||
return data
|
||||
|
||||
# If we got empty data (len == 0), check if this is EOF
|
||||
if data.len == 0 and state.frameSorter.isEOF():
|
||||
# This is EOF - stream has been closed with FIN bit from remote
|
||||
let stream = state.stream.valueOr:
|
||||
return @[] # Already closed
|
||||
# If local read is also closed, switch to ClosedStream
|
||||
stream.switch(newClosedStreamState(state))
|
||||
return @[] # Return EOF per RFC 9000
|
||||
|
||||
# Empty data but no EOF; continue reading for more data
|
||||
return await state.read()
|
||||
|
||||
method write*(state: ReceiveStreamState, bytes: seq[byte]) {.async.} =
|
||||
raise newException(ClosedStreamError, "write side is closed")
|
||||
|
||||
method close*(state: ReceiveStreamState) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.switch(newClosedStreamState(state))
|
||||
|
||||
method closeWrite*(state: ReceiveStreamState) {.async.} =
|
||||
discard
|
||||
|
||||
method closeRead*(state: ReceiveStreamState) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.switch(newClosedStreamState(state))
|
||||
|
||||
method onClose*(state: ReceiveStreamState) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.switch(newClosedStreamState(state))
|
||||
|
||||
method isClosed*(state: ReceiveStreamState): bool =
|
||||
false
|
||||
|
||||
method receive*(
|
||||
state: ReceiveStreamState, offset: uint64, bytes: seq[byte], isFin: bool
|
||||
) =
|
||||
state.frameSorter.insert(offset, bytes, isFin)
|
||||
|
||||
if state.frameSorter.isComplete():
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.closed.fire()
|
||||
stream.switch(newClosedStreamState(state))
|
||||
|
||||
method reset*(state: ReceiveStreamState) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
state.connection.shutdownStream(stream.id)
|
||||
stream.closed.fire()
|
||||
state.frameSorter.reset()
|
||||
stream.switch(newClosedStreamState(state, wasReset = true))
|
||||
65
quic/transport/ngtcp2/streamstate/sendstate.nim
Normal file
65
quic/transport/ngtcp2/streamstate/sendstate.nim
Normal file
@@ -0,0 +1,65 @@
|
||||
import ../../../errors
|
||||
import ../../../basics
|
||||
import ../../stream
|
||||
import ../../framesorter
|
||||
import ../native/connection
|
||||
import ./basestate
|
||||
import ./closestate
|
||||
|
||||
type SendStreamState* = ref object of BaseStreamState
|
||||
|
||||
proc newSendStreamState*(base: BaseStreamState): SendStreamState =
|
||||
SendStreamState(
|
||||
connection: base.connection, incoming: base.incoming, frameSorter: base.frameSorter
|
||||
)
|
||||
|
||||
method enter*(state: SendStreamState, stream: Stream) =
|
||||
procCall enter(StreamState(state), stream)
|
||||
state.stream = Opt.some(stream)
|
||||
state.setUserData(stream)
|
||||
state.frameSorter.close()
|
||||
|
||||
method leave*(state: SendStreamState) =
|
||||
procCall leave(StreamState(state))
|
||||
state.stream = Opt.none(Stream)
|
||||
|
||||
method read*(state: SendStreamState): Future[seq[byte]] {.async.} =
|
||||
raise newException(ClosedStreamError, "read side is closed")
|
||||
|
||||
method write*(state: SendStreamState, bytes: seq[byte]) {.async.} =
|
||||
await state.connection.send(state.stream.get.id, bytes)
|
||||
|
||||
method close*(state: SendStreamState) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
discard state.connection.send(state.stream.get.id, @[], true) # Send FIN
|
||||
stream.switch(newClosedStreamState(state))
|
||||
|
||||
method closeWrite*(state: SendStreamState) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
discard state.connection.send(state.stream.get.id, @[], true) # Send FIN
|
||||
stream.switch(newClosedStreamState(state))
|
||||
|
||||
method closeRead*(stream: SendStreamState) {.async.} =
|
||||
discard
|
||||
|
||||
method onClose*(state: SendStreamState) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
stream.switch(newClosedStreamState(state))
|
||||
|
||||
method isClosed*(state: SendStreamState): bool =
|
||||
false
|
||||
|
||||
method receive*(state: SendStreamState, offset: uint64, bytes: seq[byte], isFin: bool) =
|
||||
discard
|
||||
|
||||
method reset*(state: SendStreamState) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
state.connection.shutdownStream(stream.id)
|
||||
stream.closed.fire()
|
||||
state.frameSorter.reset()
|
||||
stream.switch(newClosedStreamState(state, wasReset = true))
|
||||
Reference in New Issue
Block a user