mirror of
https://github.com/vacp2p/nim-quic.git
synced 2026-01-08 21:38:05 -05:00
feat: implement half-close functionality for QUIC streams (#83)
This commit is contained in:
@@ -8,6 +8,7 @@ import ./connection
|
||||
import ./udp/datagram
|
||||
import ./errors
|
||||
import ./transport/tlsbackend
|
||||
import ./transport/stream
|
||||
import ./helpers/rand
|
||||
|
||||
export Listener
|
||||
@@ -20,6 +21,7 @@ export remoteAddress
|
||||
export incomingStream
|
||||
export read
|
||||
export write
|
||||
export closeWrite
|
||||
export stop
|
||||
export drop
|
||||
export close
|
||||
|
||||
@@ -39,6 +39,9 @@ method write*(state: ClosedStream, bytes: seq[byte]) {.async.} =
|
||||
method close*(state: ClosedStream) {.async.} =
|
||||
discard
|
||||
|
||||
method closeWrite*(state: ClosedStream) {.async.} =
|
||||
discard
|
||||
|
||||
method onClose*(state: ClosedStream) =
|
||||
discard
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ type OpenStream* = ref object of StreamState
|
||||
connection*: Ngtcp2Connection
|
||||
frameSorter*: FrameSorter
|
||||
closeFut*: Future[string]
|
||||
writeFinSent*: bool
|
||||
|
||||
proc newOpenStream*(connection: Ngtcp2Connection): OpenStream =
|
||||
let incomingQ = newAsyncQueue[seq[byte]]()
|
||||
@@ -23,6 +24,7 @@ proc newOpenStream*(connection: Ngtcp2Connection): OpenStream =
|
||||
incoming: incomingQ,
|
||||
closeFut: newFuture[string](),
|
||||
frameSorter: initFrameSorter(incomingQ),
|
||||
writeFinSent: false,
|
||||
)
|
||||
|
||||
method enter*(state: OpenStream, stream: Stream) =
|
||||
@@ -53,6 +55,10 @@ method read*(state: OpenStream): Future[seq[byte]] {.async.} =
|
||||
raise newException(StreamError, closeReason)
|
||||
|
||||
method write*(state: OpenStream, bytes: seq[byte]): Future[void] =
|
||||
if state.writeFinSent:
|
||||
let fut = newFuture[void]()
|
||||
fut.fail(newException(StreamError, "write side is closed"))
|
||||
return fut
|
||||
# let stream = state.stream.valueOr:
|
||||
# raise newException(QuicError, "stream is closed")
|
||||
# See https://github.com/status-im/nim-quic/pull/41 for more details
|
||||
@@ -62,8 +68,17 @@ method close*(state: OpenStream) {.async.} =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
discard state.connection.send(state.stream.get.id, @[], true)
|
||||
state.writeFinSent = true
|
||||
stream.switch(newClosedStream(state.incoming, state.frameSorter))
|
||||
|
||||
method closeWrite*(state: OpenStream) {.async.} =
|
||||
## Close write side by sending FIN, but keep read side open
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
discard state.connection.send(state.stream.get.id, @[], true)
|
||||
state.writeFinSent = true
|
||||
# Note: we don't switch to ClosedStream here - read side stays open
|
||||
|
||||
method reset*(state: OpenStream) =
|
||||
let stream = state.stream.valueOr:
|
||||
return
|
||||
|
||||
@@ -30,6 +30,9 @@ method write*(state: StreamState, bytes: seq[byte]) {.base, async.} =
|
||||
method close*(state: StreamState) {.base, async.} =
|
||||
doAssert false, "override this method"
|
||||
|
||||
method closeWrite*(state: StreamState) {.base, async.} =
|
||||
doAssert false, "override this method"
|
||||
|
||||
method reset*(state: StreamState) {.base.} =
|
||||
doAssert false, "override this method"
|
||||
|
||||
@@ -71,6 +74,9 @@ proc write*(stream: Stream, bytes: seq[byte]) {.async.} =
|
||||
proc close*(stream: Stream) {.async.} =
|
||||
await stream.state.close()
|
||||
|
||||
proc closeWrite*(stream: Stream) {.async.} =
|
||||
await stream.state.closeWrite()
|
||||
|
||||
proc reset*(stream: Stream) =
|
||||
stream.state.reset()
|
||||
|
||||
|
||||
@@ -182,3 +182,63 @@ suite "streams":
|
||||
check incoming == message
|
||||
|
||||
await simulation.cancelAndWait()
|
||||
|
||||
asyncTest "closeWrite() prevents further writes":
|
||||
let simulation = simulateNetwork(client, server)
|
||||
let message = @[1'u8, 2'u8, 3'u8]
|
||||
|
||||
let clientStream = await client.openStream()
|
||||
await clientStream.write(message)
|
||||
await clientStream.closeWrite()
|
||||
|
||||
# Writing after closeWrite should fail
|
||||
expect QuicError:
|
||||
await clientStream.write(@[4'u8, 5'u8, 6'u8])
|
||||
|
||||
await simulation.cancelAndWait()
|
||||
|
||||
asyncTest "closeWrite() sends FIN but allows server to write back":
|
||||
let simulation = simulateNetwork(client, server)
|
||||
let clientMessage = @[1'u8, 2'u8, 3'u8]
|
||||
let serverMessage = @[4'u8, 5'u8, 6'u8]
|
||||
|
||||
# Client writes and closes write side
|
||||
let clientStream = await client.openStream()
|
||||
await clientStream.write(clientMessage)
|
||||
await clientStream.closeWrite()
|
||||
|
||||
# Server reads client message
|
||||
let serverStream = await server.incomingStream()
|
||||
let incoming = await serverStream.read()
|
||||
check incoming == clientMessage
|
||||
|
||||
# Server should still be able to write back
|
||||
await serverStream.write(serverMessage)
|
||||
|
||||
# Client should be able to read server's response
|
||||
let response = await clientStream.read()
|
||||
check response == serverMessage
|
||||
|
||||
await simulation.cancelAndWait()
|
||||
|
||||
asyncTest "closeWrite() called on closed stream does nothing":
|
||||
let simulation = simulateNetwork(client, server)
|
||||
|
||||
let clientStream = await client.openStream()
|
||||
await clientStream.close()
|
||||
|
||||
# Calling closeWrite on already closed stream should not raise
|
||||
await clientStream.closeWrite()
|
||||
await simulation.cancelAndWait()
|
||||
|
||||
asyncTest "writing on a stream closed for writing raises error":
|
||||
let simulation = simulateNetwork(client, server)
|
||||
|
||||
let clientStream = await client.openStream()
|
||||
await clientStream.write(@[1'u8, 2'u8, 3'u8])
|
||||
await clientStream.close()
|
||||
|
||||
expect QuicError:
|
||||
await clientStream.write(@[4'u8, 5'u8, 6'u8])
|
||||
|
||||
await simulation.cancelAndWait()
|
||||
|
||||
Reference in New Issue
Block a user