From 128645547dd91235ca9ba2f91698a6639430a904 Mon Sep 17 00:00:00 2001 From: MorganaFuture <103630661+MorganaFuture@users.noreply.github.com> Date: Fri, 18 Jul 2025 00:14:31 +0300 Subject: [PATCH] feat: implement half-close functionality for QUIC streams (#83) --- quic/api.nim | 2 + quic/transport/ngtcp2/stream/closedstate.nim | 3 + quic/transport/ngtcp2/stream/openstate.nim | 15 +++++ quic/transport/stream.nim | 6 ++ tests/quic/testStreams.nim | 60 ++++++++++++++++++++ 5 files changed, 86 insertions(+) diff --git a/quic/api.nim b/quic/api.nim index c302dea..cd20d30 100644 --- a/quic/api.nim +++ b/quic/api.nim @@ -8,6 +8,7 @@ import ./connection import ./udp/datagram import ./errors import ./transport/tlsbackend +import ./transport/stream import ./helpers/rand export Listener @@ -20,6 +21,7 @@ export remoteAddress export incomingStream export read export write +export closeWrite export stop export drop export close diff --git a/quic/transport/ngtcp2/stream/closedstate.nim b/quic/transport/ngtcp2/stream/closedstate.nim index d9bf511..c1d7167 100644 --- a/quic/transport/ngtcp2/stream/closedstate.nim +++ b/quic/transport/ngtcp2/stream/closedstate.nim @@ -39,6 +39,9 @@ method write*(state: ClosedStream, bytes: seq[byte]) {.async.} = method close*(state: ClosedStream) {.async.} = discard +method closeWrite*(state: ClosedStream) {.async.} = + discard + method onClose*(state: ClosedStream) = discard diff --git a/quic/transport/ngtcp2/stream/openstate.nim b/quic/transport/ngtcp2/stream/openstate.nim index b67066f..6d825ab 100644 --- a/quic/transport/ngtcp2/stream/openstate.nim +++ b/quic/transport/ngtcp2/stream/openstate.nim @@ -15,6 +15,7 @@ type OpenStream* = ref object of StreamState connection*: Ngtcp2Connection frameSorter*: FrameSorter closeFut*: Future[string] + writeFinSent*: bool proc newOpenStream*(connection: Ngtcp2Connection): OpenStream = let incomingQ = newAsyncQueue[seq[byte]]() @@ -23,6 +24,7 @@ proc newOpenStream*(connection: Ngtcp2Connection): OpenStream = incoming: incomingQ, closeFut: newFuture[string](), frameSorter: initFrameSorter(incomingQ), + writeFinSent: false, ) method enter*(state: OpenStream, stream: Stream) = @@ -53,6 +55,10 @@ method read*(state: OpenStream): Future[seq[byte]] {.async.} = raise newException(StreamError, closeReason) method write*(state: OpenStream, bytes: seq[byte]): Future[void] = + if state.writeFinSent: + let fut = newFuture[void]() + fut.fail(newException(StreamError, "write side is closed")) + return fut # let stream = state.stream.valueOr: # raise newException(QuicError, "stream is closed") # See https://github.com/status-im/nim-quic/pull/41 for more details @@ -62,8 +68,17 @@ method close*(state: OpenStream) {.async.} = let stream = state.stream.valueOr: return discard state.connection.send(state.stream.get.id, @[], true) + state.writeFinSent = true stream.switch(newClosedStream(state.incoming, state.frameSorter)) +method closeWrite*(state: OpenStream) {.async.} = + ## Close write side by sending FIN, but keep read side open + let stream = state.stream.valueOr: + return + discard state.connection.send(state.stream.get.id, @[], true) + state.writeFinSent = true + # Note: we don't switch to ClosedStream here - read side stays open + method reset*(state: OpenStream) = let stream = state.stream.valueOr: return diff --git a/quic/transport/stream.nim b/quic/transport/stream.nim index 24f669b..206b495 100644 --- a/quic/transport/stream.nim +++ b/quic/transport/stream.nim @@ -30,6 +30,9 @@ method write*(state: StreamState, bytes: seq[byte]) {.base, async.} = method close*(state: StreamState) {.base, async.} = doAssert false, "override this method" +method closeWrite*(state: StreamState) {.base, async.} = + doAssert false, "override this method" + method reset*(state: StreamState) {.base.} = doAssert false, "override this method" @@ -71,6 +74,9 @@ proc write*(stream: Stream, bytes: seq[byte]) {.async.} = proc close*(stream: Stream) {.async.} = await stream.state.close() +proc closeWrite*(stream: Stream) {.async.} = + await stream.state.closeWrite() + proc reset*(stream: Stream) = stream.state.reset() diff --git a/tests/quic/testStreams.nim b/tests/quic/testStreams.nim index 6d2a5c3..35c58df 100644 --- a/tests/quic/testStreams.nim +++ b/tests/quic/testStreams.nim @@ -182,3 +182,63 @@ suite "streams": check incoming == message await simulation.cancelAndWait() + + asyncTest "closeWrite() prevents further writes": + let simulation = simulateNetwork(client, server) + let message = @[1'u8, 2'u8, 3'u8] + + let clientStream = await client.openStream() + await clientStream.write(message) + await clientStream.closeWrite() + + # Writing after closeWrite should fail + expect QuicError: + await clientStream.write(@[4'u8, 5'u8, 6'u8]) + + await simulation.cancelAndWait() + + asyncTest "closeWrite() sends FIN but allows server to write back": + let simulation = simulateNetwork(client, server) + let clientMessage = @[1'u8, 2'u8, 3'u8] + let serverMessage = @[4'u8, 5'u8, 6'u8] + + # Client writes and closes write side + let clientStream = await client.openStream() + await clientStream.write(clientMessage) + await clientStream.closeWrite() + + # Server reads client message + let serverStream = await server.incomingStream() + let incoming = await serverStream.read() + check incoming == clientMessage + + # Server should still be able to write back + await serverStream.write(serverMessage) + + # Client should be able to read server's response + let response = await clientStream.read() + check response == serverMessage + + await simulation.cancelAndWait() + + asyncTest "closeWrite() called on closed stream does nothing": + let simulation = simulateNetwork(client, server) + + let clientStream = await client.openStream() + await clientStream.close() + + # Calling closeWrite on already closed stream should not raise + await clientStream.closeWrite() + await simulation.cancelAndWait() + + asyncTest "writing on a stream closed for writing raises error": + let simulation = simulateNetwork(client, server) + + let clientStream = await client.openStream() + await clientStream.write(@[1'u8, 2'u8, 3'u8]) + await clientStream.close() + + expect QuicError: + await clientStream.write(@[4'u8, 5'u8, 6'u8]) + + await simulation.cancelAndWait()