diff --git a/.gitignore b/.gitignore index 8a7a7bb..0ae18d4 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,9 @@ # Second rule is here to un-ignores all files with extension, # because it appears that vs code is skipping text search is some tests files without these rules. tests/**/test*[^.]* -!tests/**/*.* \ No newline at end of file +!tests/**/*.* + +# nimble local setup +nimble.develop +nimble.paths +nimbledeps diff --git a/quic/api.nim b/quic/api.nim index f07d081..679b8f9 100644 --- a/quic/api.nim +++ b/quic/api.nim @@ -112,8 +112,7 @@ proc dial*( ) var connection: Connection proc onReceive(udp: DatagramTransport, remote: TransportAddress) {.async.} = - let datagram = Datagram(data: udp.getMessage()) - connection.receive(datagram) + connection.receive(Datagram(data: udp.getMessage())) let udp = newDatagramTransport(onReceive) connection = newOutgoingConnection(tlsBackend, udp, address, self.rng) diff --git a/quic/transport/framesorter.nim b/quic/transport/framesorter.nim index a715644..7b3e043 100644 --- a/quic/transport/framesorter.nim +++ b/quic/transport/framesorter.nim @@ -2,20 +2,27 @@ import ../errors import std/tables import chronos -type FrameSorter* = object +type FrameSorter* = ref object of RootRef buffer*: Table[int64, byte] # sparse byte storage emitPos*: int64 # where to emit data from incoming*: AsyncQueue[seq[byte]] totalBytes*: Opt[int64] # contains total bytes for frame; and is known once a FIN is received + closed: bool proc initFrameSorter*(incoming: AsyncQueue[seq[byte]]): FrameSorter = - result.incoming = incoming - result.buffer = initTable[int64, byte]() - result.emitPos = 0 - result.totalBytes = Opt.none(int64) + return FrameSorter( + incoming: incoming, + buffer: initTable[int64, byte](), + emitPos: 0, + totalBytes: Opt.none(int64), + closed: false, + ) proc isEOF*(fs: FrameSorter): bool = + if fs.closed: + return true + if fs.totalBytes.isNone: return false @@ -50,9 +57,18 @@ proc emitBufferedData(fs: var FrameSorter) {.raises: [QuicError].} = fs.putToQueue(emitData) +proc close*(fs: var FrameSorter) = + if fs.closed: + return + fs.closed = true + fs.sendEof() + proc insert*( fs: var FrameSorter, offset: uint64, data: seq[byte], isFin: bool ) {.raises: [QuicError].} = + if fs.closed: + return + if isFin: fs.totalBytes = Opt.some(offset.int64 + max(data.len - 1, 0)) defer: @@ -99,6 +115,9 @@ proc reset*(fs: var FrameSorter) = fs.emitPos = 0 proc isComplete*(fs: FrameSorter): bool = + if fs.closed: + return true + if fs.totalBytes.isNone: return false diff --git a/quic/transport/ngtcp2/connection/openstate.nim b/quic/transport/ngtcp2/connection/openstate.nim index 2babe18..611b9de 100644 --- a/quic/transport/ngtcp2/connection/openstate.nim +++ b/quic/transport/ngtcp2/connection/openstate.nim @@ -114,10 +114,9 @@ method receive(state: OpenConnection, datagram: Datagram) = errMsg = exc.msg trace "ngtcp2 error on receive", code = errCode, msg = errMsg finally: - var isDraining = state.ngtcp2Connection.isDraining let quicConnection = state.quicConnection.valueOr: return - if isDraining: + if state.ngtcp2Connection.isDraining: let ids = state.ids let duration = state.ngtcp2Connection.closingDuration() let draining = newDrainingConnection(ids, duration, state.derCertificates) diff --git a/quic/transport/ngtcp2/native/streams.nim b/quic/transport/ngtcp2/native/streams.nim index e068e29..04a9510 100644 --- a/quic/transport/ngtcp2/native/streams.nim +++ b/quic/transport/ngtcp2/native/streams.nim @@ -1,12 +1,12 @@ import ngtcp2 import ../../../helpers/openarray import ../../stream -import ../stream/openstream +import ../streamstate/openstate import ./connection import chronicles proc newStream(connection: Ngtcp2Connection, id: int64): Stream = - newStream(id, newOpenStream(connection)) + newStream(id, newOpenStreamState(connection)) proc openStream*(connection: Ngtcp2Connection, unidirectional: bool): Stream = var id: int64 diff --git a/quic/transport/ngtcp2/stream/basestream.nim b/quic/transport/ngtcp2/stream/basestream.nim deleted file mode 100644 index 240966b..0000000 --- a/quic/transport/ngtcp2/stream/basestream.nim +++ /dev/null @@ -1,10 +0,0 @@ -import ../../../basics -import ../../framesorter -import ../../stream -import ../native/connection - -type BaseStream* = ref object of StreamState - stream*: Opt[Stream] - incoming*: AsyncQueue[seq[byte]] - connection*: Ngtcp2Connection - frameSorter*: FrameSorter diff --git a/quic/transport/ngtcp2/stream/closestream.nim b/quic/transport/ngtcp2/stream/closestream.nim deleted file mode 100644 index 1aeab49..0000000 --- a/quic/transport/ngtcp2/stream/closestream.nim +++ /dev/null @@ -1,60 +0,0 @@ -import ../../../errors -import ../../../basics -import ../../stream -import ../../framesorter -import ./basestream -import ./helpers - -type ClosedStream* = ref object of BaseStream - wasReset: bool - -proc newClosedStream*( - incoming: AsyncQueue[seq[byte]], frameSorter: FrameSorter, wasReset: bool = false -): ClosedStream = - ClosedStream(incoming: incoming, wasReset: wasReset) - -method enter*(state: ClosedStream, stream: Stream) = - setUserData(state.stream, state.connection, nil) - -method leave*(state: ClosedStream) = - discard - -method read*(state: ClosedStream): Future[seq[byte]] {.async.} = - # If stream was reset, always throw exception - if state.wasReset: - raise newException(ClosedStreamError, "stream was reset") - - try: - return state.incoming.popFirstNoWait() - except AsyncQueueEmptyError: - discard - - # When no more data is available, return EOF instead of throwing exception - return @[] - -method write*(state: ClosedStream, bytes: seq[byte]) {.async.} = - raise newException(ClosedStreamError, "stream is closed") - -method close*(state: ClosedStream) {.async.} = - discard - -method closeWrite*(state: ClosedStream) {.async.} = - discard - -method closeRead*(state: ClosedStream) {.async.} = - discard - -method onClose*(state: ClosedStream) = - discard - -method isClosed*(state: ClosedStream): bool = - true - -method receive*(state: ClosedStream, offset: uint64, bytes: seq[byte], isFin: bool) = - discard - -method reset*(state: ClosedStream) = - discard - -method expire*(state: ClosedStream) {.raises: [].} = - discard diff --git a/quic/transport/ngtcp2/stream/helpers.nim b/quic/transport/ngtcp2/stream/helpers.nim deleted file mode 100644 index 3d88682..0000000 --- a/quic/transport/ngtcp2/stream/helpers.nim +++ /dev/null @@ -1,24 +0,0 @@ -import chronicles -import ../../../basics -import ../../stream -import ../native/connection - -proc allowMoreIncomingBytes*( - stream: Opt[stream.Stream], connection: Ngtcp2Connection, amount: uint64 -) = - let stream = stream.valueOr: - return - connection.extendStreamOffset(stream.id, amount) - connection.send() - -proc setUserData*( - stream: Opt[stream.Stream], connection: Ngtcp2Connection, userdata: pointer -) = - let stream = stream.valueOr: - return - connection.setStreamUserData(stream.id, userdata) - -proc expire*(stream: Opt[stream.Stream]) = - let stream = stream.valueOr: - return - stream.closed.fire() diff --git a/quic/transport/ngtcp2/stream/openstream.nim b/quic/transport/ngtcp2/stream/openstream.nim deleted file mode 100644 index af1cfe0..0000000 --- a/quic/transport/ngtcp2/stream/openstream.nim +++ /dev/null @@ -1,105 +0,0 @@ -import ../../../basics -import ../../framesorter -import ../../stream -import ../native/connection -import ./basestream -import ./closestream -import ./receivestream -import ./sendstream -import ./helpers - -type OpenStream* = ref object of BaseStream - -proc newOpenStream*(connection: Ngtcp2Connection): OpenStream = - let incomingQ = newAsyncQueue[seq[byte]]() - OpenStream( - connection: connection, incoming: incomingQ, frameSorter: initFrameSorter(incomingQ) - ) - -method enter*(state: OpenStream, stream: Stream) = - procCall enter(StreamState(state), stream) - state.stream = Opt.some(stream) - setUserData(state.stream, state.connection, unsafeAddr state[]) - -method leave*(state: OpenStream) = - procCall leave(StreamState(state)) - state.stream = Opt.none(Stream) - -method read*(state: OpenStream): Future[seq[byte]] {.async.} = - # Check for immediate EOF conditions - if state.frameSorter.isEOF() and state.incoming.len == 0: - return @[] # Return EOF immediately per RFC 9000 "Data Read" state - - # Get data from incoming queue - let data = await state.incoming.get() - - # If we got real data, return it with flow control update - if data.len > 0: - allowMoreIncomingBytes(state.stream, state.connection, data.len.uint64) - return data - - # If we got empty data (len == 0), check if this is EOF - if data.len == 0 and state.frameSorter.isEOF(): - return @[] # Return EOF per RFC 9000 - - # Empty data but no EOF; continue reading for more data - return await state.read() - -method write*(state: OpenStream, bytes: seq[byte]): Future[void] = - state.connection.send(state.stream.get.id, bytes) - -method close*(state: OpenStream) {.async.} = - # Bidirectional streams, close() only closes the send side of the stream. - let stream = state.stream.valueOr: - return - discard state.connection.send(state.stream.get.id, @[], true) # Send FIN - stream.switch(newReceiveStream(state.connection, state.incoming, state.frameSorter)) - -method closeWrite*(state: OpenStream) {.async.} = - let stream = state.stream.valueOr: - return - discard state.connection.send(state.stream.get.id, @[], true) # Send FIN - stream.switch(newReceiveStream(state.connection, state.incoming, state.frameSorter)) - -method closeRead*(state: OpenStream) {.async.} = - let stream = state.stream.valueOr: - return - stream.switch(newSendStream(state.connection, state.incoming, state.frameSorter)) - -method onClose*(state: OpenStream) = - let stream = state.stream.valueOr: - return - - # Wake up pending read() operations before switching states - # This fixes race condition when ngtcp2 calls onClose() while read() is waiting - try: - state.incoming.putNoWait(@[]) # Send EOF marker to wake up pending reads - except AsyncQueueFullError: - # Queue is full, that's fine - there's already data to process - discard - - stream.switch(newClosedStream(state.incoming, state.frameSorter)) - -method isClosed*(state: OpenStream): bool = - false - -method receive*(state: OpenStream, offset: uint64, bytes: seq[byte], isFin: bool) = - state.frameSorter.insert(offset, bytes, isFin) - - if state.frameSorter.isComplete(): - let stream = state.stream.valueOr: - return - stream.closed.fire() - stream.switch(newClosedStream(state.incoming, state.frameSorter)) - -method reset*(state: OpenStream) = - let stream = state.stream.valueOr: - return - - state.connection.shutdownStream(stream.id) - stream.closed.fire() - state.frameSorter.reset() - stream.switch(newClosedStream(state.incoming, state.frameSorter, wasReset = true)) - -method expire*(state: OpenStream) {.raises: [].} = - expire(state.stream) diff --git a/quic/transport/ngtcp2/stream/receivestream.nim b/quic/transport/ngtcp2/stream/receivestream.nim deleted file mode 100644 index 7724f65..0000000 --- a/quic/transport/ngtcp2/stream/receivestream.nim +++ /dev/null @@ -1,108 +0,0 @@ -import ../../../errors -import ../../../basics -import ../../stream -import ../../framesorter -import ../native/connection -import ./basestream -import ./closestream -import ./helpers - -type ReceiveStream* = ref object of BaseStream - -proc newReceiveStream*( - connection: Ngtcp2Connection, - incoming: AsyncQueue[seq[byte]], - frameSorter: FrameSorter, -): ReceiveStream = - ReceiveStream(connection: connection, incoming: incoming, frameSorter: frameSorter) - -method enter*(state: ReceiveStream, stream: Stream) = - procCall enter(StreamState(state), stream) - state.stream = Opt.some(stream) - setUserData(state.stream, state.connection, unsafeAddr state[]) - -method leave*(state: ReceiveStream) = - procCall leave(StreamState(state)) - state.stream = Opt.none(Stream) - -method read*(state: ReceiveStream): Future[seq[byte]] {.async.} = - # Check for immediate EOF conditions - if state.frameSorter.isEOF() and state.incoming.len == 0: - let stream = state.stream.valueOr: - return @[] # Already closed - stream.switch(newClosedStream(state.incoming, state.frameSorter)) - return @[] # Return EOF immediately per RFC 9000 "Data Read" state - - # Get data from incoming queue - let data = await state.incoming.get() - - # If we got real data, return it with flow control update - if data.len > 0: - allowMoreIncomingBytes(state.stream, state.connection, data.len.uint64) - return data - - # If we got empty data (len == 0), check if this is EOF - if data.len == 0 and state.frameSorter.isEOF(): - # This is EOF - stream has been closed with FIN bit from remote - let stream = state.stream.valueOr: - return @[] # Already closed - # If local read is also closed, switch to ClosedStream - stream.switch(newClosedStream(state.incoming, state.frameSorter)) - return @[] # Return EOF per RFC 9000 - - # Empty data but no EOF; continue reading for more data - return await state.read() - -method write*(state: ReceiveStream, bytes: seq[byte]) {.async.} = - raise newException(ClosedStreamError, "write side is closed") - -method close*(state: ReceiveStream) {.async.} = - let stream = state.stream.valueOr: - return - stream.switch(newClosedStream(state.incoming, state.frameSorter)) - -method closeWrite*(state: ReceiveStream) {.async.} = - discard - -method closeRead*(state: ReceiveStream) {.async.} = - let stream = state.stream.valueOr: - return - stream.switch(newClosedStream(state.incoming, state.frameSorter)) - -method onClose*(state: ReceiveStream) = - let stream = state.stream.valueOr: - return - - # Wake up pending read() operations before switching states - # This fixes race condition when ngtcp2 calls onClose() while read() is waiting - try: - state.incoming.putNoWait(@[]) # Send EOF marker to wake up pending reads - except AsyncQueueFullError: - # Queue is full, that's fine - there's already data to process - discard - - stream.switch(newClosedStream(state.incoming, state.frameSorter)) - -method isClosed*(state: ReceiveStream): bool = - false - -method receive*(state: ReceiveStream, offset: uint64, bytes: seq[byte], isFin: bool) = - state.frameSorter.insert(offset, bytes, isFin) - - if state.frameSorter.isComplete(): - let stream = state.stream.valueOr: - return - stream.closed.fire() - stream.switch(newClosedStream(state.incoming, state.frameSorter)) - -method reset*(state: ReceiveStream) = - let stream = state.stream.valueOr: - return - - state.connection.shutdownStream(stream.id) - stream.closed.fire() - state.frameSorter.reset() - stream.switch(newClosedStream(state.incoming, state.frameSorter, wasReset = true)) - -method expire*(state: ReceiveStream) {.raises: [].} = - expire(state.stream) diff --git a/quic/transport/ngtcp2/stream/sendstream.nim b/quic/transport/ngtcp2/stream/sendstream.nim deleted file mode 100644 index 345b665..0000000 --- a/quic/transport/ngtcp2/stream/sendstream.nim +++ /dev/null @@ -1,68 +0,0 @@ -import ../../../errors -import ../../../basics -import ../../stream -import ../../framesorter -import ../native/connection -import ./basestream -import ./closestream -import ./helpers - -type SendStream* = ref object of BaseStream - -proc newSendStream*( - connection: Ngtcp2Connection, - incoming: AsyncQueue[seq[byte]], - frameSorter: FrameSorter, -): SendStream = - SendStream(connection: connection, incoming: incoming, frameSorter: frameSorter) - -method enter*(state: SendStream, stream: Stream) = - procCall enter(StreamState(state), stream) - state.stream = Opt.some(stream) - setUserData(state.stream, state.connection, unsafeAddr state[]) - -method leave*(state: SendStream) = - procCall leave(StreamState(state)) - state.stream = Opt.none(Stream) - -method read*(state: SendStream): Future[seq[byte]] {.async.} = - raise newException(ClosedStreamError, "read side is closed") - -method write*(state: SendStream, bytes: seq[byte]) {.async.} = - await state.connection.send(state.stream.get.id, bytes) - -method close*(state: SendStream) {.async.} = - let stream = state.stream.valueOr: - return - discard state.connection.send(state.stream.get.id, @[], true) # Send FIN - stream.switch(newClosedStream(state.incoming, state.frameSorter)) - -method closeWrite*(state: SendStream) {.async.} = - let stream = state.stream.valueOr: - return - discard state.connection.send(state.stream.get.id, @[], true) # Send FIN - stream.switch(newClosedStream(state.incoming, state.frameSorter)) - -method closeRead*(stream: SendStream) {.async.} = - discard - -method onClose*(state: SendStream) = - discard - -method isClosed*(state: SendStream): bool = - false - -method receive*(state: SendStream, offset: uint64, bytes: seq[byte], isFin: bool) = - discard - -method reset*(state: SendStream) = - let stream = state.stream.valueOr: - return - - state.connection.shutdownStream(stream.id) - stream.closed.fire() - state.frameSorter.reset() - stream.switch(newClosedStream(state.incoming, state.frameSorter, wasReset = true)) - -method expire*(state: SendStream) {.raises: [].} = - expire(state.stream) diff --git a/quic/transport/ngtcp2/streamstate/basestate.nim b/quic/transport/ngtcp2/streamstate/basestate.nim new file mode 100644 index 0000000..dd5e1ef --- /dev/null +++ b/quic/transport/ngtcp2/streamstate/basestate.nim @@ -0,0 +1,24 @@ +import ../../../basics +import ../../framesorter +import ../../stream +import ../native/connection + +type BaseStreamState* = ref object of StreamState + stream*: Opt[Stream] + incoming*: AsyncQueue[seq[byte]] + connection*: Ngtcp2Connection + frameSorter*: FrameSorter + +proc setUserData*(state: BaseStreamState, stream: stream.Stream) = + state.connection.setStreamUserData(stream.id, unsafeAddr state[]) + +method expire*(state: BaseStreamState) {.raises: [].} = + let stream = state.stream.valueOr: + return + stream.closed.fire() + +proc allowMoreIncomingBytes*(state: BaseStreamState, amount: uint64) = + let stream = state.stream.valueOr: + return + state.connection.extendStreamOffset(stream.id, amount) + state.connection.send() diff --git a/quic/transport/ngtcp2/streamstate/closestate.nim b/quic/transport/ngtcp2/streamstate/closestate.nim new file mode 100644 index 0000000..8ed4274 --- /dev/null +++ b/quic/transport/ngtcp2/streamstate/closestate.nim @@ -0,0 +1,67 @@ +import ../../../errors +import ../../../basics +import ../../stream +import ../../framesorter +import ../native/connection +import ./basestate + +type ClosedStreamState* = ref object of BaseStreamState + wasReset: bool + +proc newClosedStreamState*( + base: BaseStreamState, wasReset: bool = false +): ClosedStreamState = + ClosedStreamState( + connection: base.connection, + incoming: base.incoming, + frameSorter: base.frameSorter, + wasReset: wasReset, + ) + +method enter*(state: ClosedStreamState, stream: Stream) = + procCall enter(StreamState(state), stream) + state.stream = Opt.some(stream) + state.setUserData(stream) + state.frameSorter.close() + +method leave*(state: ClosedStreamState) = + doAssert false, "ClosedStreamState state should never leave" + +method read*(state: ClosedStreamState): Future[seq[byte]] {.async.} = + # If stream was reset, always throw exception + if state.wasReset: + raise newException(ClosedStreamError, "stream was reset") + + try: + return state.incoming.popFirstNoWait() + except AsyncQueueEmptyError: + discard + + # When no more data is available, return EOF instead of throwing exception + return @[] + +method write*(state: ClosedStreamState, bytes: seq[byte]) {.async.} = + raise newException(ClosedStreamError, "stream is closed") + +method close*(state: ClosedStreamState) {.async.} = + discard + +method closeWrite*(state: ClosedStreamState) {.async.} = + discard + +method closeRead*(state: ClosedStreamState) {.async.} = + discard + +method onClose*(state: ClosedStreamState) = + discard + +method isClosed*(state: ClosedStreamState): bool = + true + +method receive*( + state: ClosedStreamState, offset: uint64, bytes: seq[byte], isFin: bool +) = + discard + +method reset*(state: ClosedStreamState) = + discard diff --git a/quic/transport/ngtcp2/streamstate/openstate.nim b/quic/transport/ngtcp2/streamstate/openstate.nim new file mode 100644 index 0000000..07ccacb --- /dev/null +++ b/quic/transport/ngtcp2/streamstate/openstate.nim @@ -0,0 +1,92 @@ +import ../../../basics +import ../../framesorter +import ../../stream +import ../native/connection +import ./basestate +import ./closestate +import ./receivestate +import ./sendstate + +type OpenStreamState* = ref object of BaseStreamState + +proc newOpenStreamState*(connection: Ngtcp2Connection): OpenStreamState = + let incomingQ = newAsyncQueue[seq[byte]]() + OpenStreamState( + connection: connection, incoming: incomingQ, frameSorter: initFrameSorter(incomingQ) + ) + +method enter*(state: OpenStreamState, stream: Stream) = + procCall enter(StreamState(state), stream) + state.stream = Opt.some(stream) + state.setUserData(stream) + +method leave*(state: OpenStreamState) = + procCall leave(StreamState(state)) + state.stream = Opt.none(Stream) + +method read*(state: OpenStreamState): Future[seq[byte]] {.async.} = + # Check for immediate EOF conditions + if state.frameSorter.isEOF() and state.incoming.len == 0: + return @[] # Return EOF immediately per RFC 9000 "Data Read" state + + # Get data from incoming queue + let data = await state.incoming.get() + + # If we got real data, return it with flow control update + if data.len > 0: + state.allowMoreIncomingBytes(data.len.uint64) + return data + + # If we got empty data (len == 0), check if this is EOF + if data.len == 0 and state.frameSorter.isEOF(): + return @[] # Return EOF per RFC 9000 + + # Empty data but no EOF; continue reading for more data + return await state.read() + +method write*(state: OpenStreamState, bytes: seq[byte]): Future[void] = + state.connection.send(state.stream.get.id, bytes) + +method close*(state: OpenStreamState) {.async.} = + # Bidirectional streams, close() only closes the send side of the stream. + let stream = state.stream.valueOr: + return + discard state.connection.send(state.stream.get.id, @[], true) # Send FIN + stream.switch(newReceiveStreamState(state)) + +method closeWrite*(state: OpenStreamState) {.async.} = + let stream = state.stream.valueOr: + return + discard state.connection.send(state.stream.get.id, @[], true) # Send FIN + stream.switch(newReceiveStreamState(state)) + +method closeRead*(state: OpenStreamState) {.async.} = + let stream = state.stream.valueOr: + return + stream.switch(newSendStreamState(state)) + +method onClose*(state: OpenStreamState) = + let stream = state.stream.valueOr: + return + stream.switch(newClosedStreamState(state)) + +method isClosed*(state: OpenStreamState): bool = + false + +method receive*(state: OpenStreamState, offset: uint64, bytes: seq[byte], isFin: bool) = + state.frameSorter.insert(offset, bytes, isFin) + + if state.frameSorter.isComplete(): + let stream = state.stream.valueOr: + return + stream.closed.fire() + stream.switch(newClosedStreamState(state)) + +method reset*(state: OpenStreamState) = + let stream = state.stream.valueOr: + return + + state.connection.shutdownStream(stream.id) + stream.closed.fire() + state.frameSorter.reset() + stream.switch(newClosedStreamState(state, wasReset = true)) diff --git a/quic/transport/ngtcp2/streamstate/receivestate.nim b/quic/transport/ngtcp2/streamstate/receivestate.nim new file mode 100644 index 0000000..d062e0b --- /dev/null +++ b/quic/transport/ngtcp2/streamstate/receivestate.nim @@ -0,0 +1,95 @@ +import ../../../errors +import ../../../basics +import ../../stream +import ../../framesorter +import ../native/connection +import ./basestate +import ./closestate + +type ReceiveStreamState* = ref object of BaseStreamState + +proc newReceiveStreamState*(base: BaseStreamState): ReceiveStreamState = + ReceiveStreamState( + connection: base.connection, incoming: base.incoming, frameSorter: base.frameSorter + ) + +method enter*(state: ReceiveStreamState, stream: Stream) = + procCall enter(StreamState(state), stream) + state.stream = Opt.some(stream) + state.setUserData(stream) + +method leave*(state: ReceiveStreamState) = + procCall leave(StreamState(state)) + state.stream = Opt.none(Stream) + +method read*(state: ReceiveStreamState): Future[seq[byte]] {.async.} = + # Check for immediate EOF conditions + if state.frameSorter.isEOF() and state.incoming.len == 0: + let stream = state.stream.valueOr: + return @[] # Already closed + stream.switch(newClosedStreamState(state)) + return @[] # Return EOF immediately per RFC 9000 "Data Read" state + + # Get data from incoming queue + let data = await state.incoming.get() + + # If we got real data, return it with flow control update + if data.len > 0: + state.allowMoreIncomingBytes(data.len.uint64) + return data + + # If we got empty data (len == 0), check if this is EOF + if data.len == 0 and state.frameSorter.isEOF(): + # This is EOF - stream has been closed with FIN bit from remote + let stream = state.stream.valueOr: + return @[] # Already closed + # If local read is also closed, switch to ClosedStream + stream.switch(newClosedStreamState(state)) + return @[] # Return EOF per RFC 9000 + + # Empty data but no EOF; continue reading for more data + return await state.read() + +method write*(state: ReceiveStreamState, bytes: seq[byte]) {.async.} = + raise newException(ClosedStreamError, "write side is closed") + +method close*(state: ReceiveStreamState) {.async.} = + let stream = state.stream.valueOr: + return + stream.switch(newClosedStreamState(state)) + +method closeWrite*(state: ReceiveStreamState) {.async.} = + discard + +method closeRead*(state: ReceiveStreamState) {.async.} = + let stream = state.stream.valueOr: + return + stream.switch(newClosedStreamState(state)) + +method onClose*(state: ReceiveStreamState) = + let stream = state.stream.valueOr: + return + stream.switch(newClosedStreamState(state)) + +method isClosed*(state: ReceiveStreamState): bool = + false + +method receive*( + state: ReceiveStreamState, offset: uint64, bytes: seq[byte], isFin: bool +) = + state.frameSorter.insert(offset, bytes, isFin) + + if state.frameSorter.isComplete(): + let stream = state.stream.valueOr: + return + stream.closed.fire() + stream.switch(newClosedStreamState(state)) + +method reset*(state: ReceiveStreamState) = + let stream = state.stream.valueOr: + return + + state.connection.shutdownStream(stream.id) + stream.closed.fire() + state.frameSorter.reset() + stream.switch(newClosedStreamState(state, wasReset = true)) diff --git a/quic/transport/ngtcp2/streamstate/sendstate.nim b/quic/transport/ngtcp2/streamstate/sendstate.nim new file mode 100644 index 0000000..31407e2 --- /dev/null +++ b/quic/transport/ngtcp2/streamstate/sendstate.nim @@ -0,0 +1,65 @@ +import ../../../errors +import ../../../basics +import ../../stream +import ../../framesorter +import ../native/connection +import ./basestate +import ./closestate + +type SendStreamState* = ref object of BaseStreamState + +proc newSendStreamState*(base: BaseStreamState): SendStreamState = + SendStreamState( + connection: base.connection, incoming: base.incoming, frameSorter: base.frameSorter + ) + +method enter*(state: SendStreamState, stream: Stream) = + procCall enter(StreamState(state), stream) + state.stream = Opt.some(stream) + state.setUserData(stream) + state.frameSorter.close() + +method leave*(state: SendStreamState) = + procCall leave(StreamState(state)) + state.stream = Opt.none(Stream) + +method read*(state: SendStreamState): Future[seq[byte]] {.async.} = + raise newException(ClosedStreamError, "read side is closed") + +method write*(state: SendStreamState, bytes: seq[byte]) {.async.} = + await state.connection.send(state.stream.get.id, bytes) + +method close*(state: SendStreamState) {.async.} = + let stream = state.stream.valueOr: + return + discard state.connection.send(state.stream.get.id, @[], true) # Send FIN + stream.switch(newClosedStreamState(state)) + +method closeWrite*(state: SendStreamState) {.async.} = + let stream = state.stream.valueOr: + return + discard state.connection.send(state.stream.get.id, @[], true) # Send FIN + stream.switch(newClosedStreamState(state)) + +method closeRead*(stream: SendStreamState) {.async.} = + discard + +method onClose*(state: SendStreamState) = + let stream = state.stream.valueOr: + return + stream.switch(newClosedStreamState(state)) + +method isClosed*(state: SendStreamState): bool = + false + +method receive*(state: SendStreamState, offset: uint64, bytes: seq[byte], isFin: bool) = + discard + +method reset*(state: SendStreamState) = + let stream = state.stream.valueOr: + return + + state.connection.shutdownStream(stream.id) + stream.closed.fire() + state.frameSorter.reset() + stream.switch(newClosedStreamState(state, wasReset = true))