chore(streamstate): add switch and write to BaseStreamState (#112)

This commit is contained in:
vladopajic
2025-08-27 22:27:24 +02:00
committed by GitHub
parent 05ad80563d
commit fe02a9a5e3
4 changed files with 38 additions and 75 deletions

View File

@@ -10,14 +10,19 @@ type BaseStreamState* = ref object of StreamState
frameSorter*: FrameSorter
finSent*: bool
proc setUserData*(state: BaseStreamState, stream: stream.Stream) =
state.connection.setStreamUserData(stream.id, unsafeAddr state[])
method expire*(state: BaseStreamState) {.raises: [].} =
let stream = state.stream.valueOr:
return
stream.closed.fire()
method write*(state: BaseStreamState, bytes: seq[byte]) {.async.} =
let stream = state.stream.valueOr:
return
await state.connection.send(stream.id, bytes)
proc setUserData*(state: BaseStreamState, stream: stream.Stream) =
state.connection.setStreamUserData(stream.id, unsafeAddr state[])
proc allowMoreIncomingBytes*(state: BaseStreamState, amount: uint64) =
let stream = state.stream.valueOr:
return
@@ -31,3 +36,8 @@ proc sendFin*(state: BaseStreamState, stream: stream.Stream) =
proc reset*(state: BaseStreamState, stream: stream.Stream) =
state.connection.shutdownStream(stream.id)
proc switch*(state: BaseStreamState, newStream: StreamState) =
let stream = state.stream.valueOr:
return
stream.switch(newStream)

View File

@@ -29,45 +29,34 @@ method read*(state: OpenStreamState): Future[seq[byte]] {.async.} =
if state.frameSorter.isEOF() and state.incoming.len == 0:
return @[] # Return EOF immediately per RFC 9000 "Data Read" state
# Get data from incoming queue
let data = await state.incoming.get()
# If we got real data, return it with flow control update
# If we got data, return it with flow control update
if data.len > 0:
state.allowMoreIncomingBytes(data.len.uint64)
return data
# If we got empty data (len == 0), check if this is EOF
if data.len == 0 and state.frameSorter.isEOF():
# Empty data (len == 0) and this is EOF
if state.frameSorter.isEOF():
return @[] # Return EOF per RFC 9000
# Empty data but no EOF; continue reading for more data
return await state.read()
method write*(state: OpenStreamState, bytes: seq[byte]): Future[void] =
let stream = state.stream.valueOr:
return
state.connection.send(stream.id, bytes)
method write*(state: OpenStreamState, bytes: seq[byte]) {.async.} =
await procCall BaseStreamState(state).write(bytes)
method close*(state: OpenStreamState) {.async.} =
let stream = state.stream.valueOr:
return
stream.switch(newReceiveStreamState(state))
state.switch(newReceiveStreamState(state))
method closeWrite*(state: OpenStreamState) {.async.} =
let stream = state.stream.valueOr:
return
stream.switch(newReceiveStreamState(state))
state.switch(newReceiveStreamState(state))
method closeRead*(state: OpenStreamState) {.async.} =
let stream = state.stream.valueOr:
return
stream.switch(newSendStreamState(state))
state.switch(newSendStreamState(state))
method onClose*(state: OpenStreamState) =
let stream = state.stream.valueOr:
return
stream.switch(newClosedStreamState(state))
state.switch(newClosedStreamState(state))
method isClosed*(state: OpenStreamState): bool =
false
@@ -75,12 +64,5 @@ method isClosed*(state: OpenStreamState): bool =
method receive*(state: OpenStreamState, offset: uint64, bytes: seq[byte], isFin: bool) =
state.frameSorter.insert(offset, bytes, isFin)
if state.frameSorter.isComplete():
let stream = state.stream.valueOr:
return
stream.switch(newClosedStreamState(state))
method reset*(state: OpenStreamState) =
let stream = state.stream.valueOr:
return
stream.switch(newClosedStreamState(state, wasReset = true))
state.switch(newClosedStreamState(state, wasReset = true))

View File

@@ -28,26 +28,19 @@ method leave*(state: ReceiveStreamState) =
method read*(state: ReceiveStreamState): Future[seq[byte]] {.async.} =
# Check for immediate EOF conditions
if state.frameSorter.isEOF() and state.incoming.len == 0:
let stream = state.stream.valueOr:
return @[] # Already closed
stream.switch(newClosedStreamState(state))
state.switch(newClosedStreamState(state))
return @[] # Return EOF immediately per RFC 9000 "Data Read" state
# Get data from incoming queue
let data = await state.incoming.get()
# If we got real data, return it with flow control update
# If we got data, return it with flow control update
if data.len > 0:
state.allowMoreIncomingBytes(data.len.uint64)
return data
# If we got empty data (len == 0), check if this is EOF
if data.len == 0 and state.frameSorter.isEOF():
# This is EOF - stream has been closed with FIN bit from remote
let stream = state.stream.valueOr:
return @[] # Already closed
# If local read is also closed, switch to ClosedStream
stream.switch(newClosedStreamState(state))
# Empty data (len == 0) and this is EOF
if state.frameSorter.isEOF():
state.switch(newClosedStreamState(state))
return @[] # Return EOF per RFC 9000
# Empty data but no EOF; continue reading for more data
@@ -57,22 +50,16 @@ method write*(state: ReceiveStreamState, bytes: seq[byte]) {.async.} =
raise newException(ClosedStreamError, "write side is closed")
method close*(state: ReceiveStreamState) {.async.} =
let stream = state.stream.valueOr:
return
stream.switch(newClosedStreamState(state))
state.switch(newClosedStreamState(state))
method closeWrite*(state: ReceiveStreamState) {.async.} =
discard
method closeRead*(state: ReceiveStreamState) {.async.} =
let stream = state.stream.valueOr:
return
stream.switch(newClosedStreamState(state))
state.switch(newClosedStreamState(state))
method onClose*(state: ReceiveStreamState) =
let stream = state.stream.valueOr:
return
stream.switch(newClosedStreamState(state))
state.switch(newClosedStreamState(state))
method isClosed*(state: ReceiveStreamState): bool =
false
@@ -81,13 +68,8 @@ method receive*(
state: ReceiveStreamState, offset: uint64, bytes: seq[byte], isFin: bool
) =
state.frameSorter.insert(offset, bytes, isFin)
if state.frameSorter.isComplete():
let stream = state.stream.valueOr:
return
stream.switch(newClosedStreamState(state))
state.switch(newClosedStreamState(state))
method reset*(state: ReceiveStreamState) =
let stream = state.stream.valueOr:
return
stream.switch(newClosedStreamState(state, wasReset = true))
state.switch(newClosedStreamState(state, wasReset = true))

View File

@@ -2,7 +2,6 @@ import ../../../errors
import ../../../basics
import ../../stream
import ../../framesorter
import ../native/connection
import ./basestate
import ./closestate
@@ -30,27 +29,19 @@ method read*(state: SendStreamState): Future[seq[byte]] {.async.} =
raise newException(ClosedStreamError, "read side is closed")
method write*(state: SendStreamState, bytes: seq[byte]) {.async.} =
let stream = state.stream.valueOr:
return
await state.connection.send(stream.id, bytes)
await procCall BaseStreamState(state).write(bytes)
method close*(state: SendStreamState) {.async.} =
let stream = state.stream.valueOr:
return
stream.switch(newClosedStreamState(state))
state.switch(newClosedStreamState(state))
method closeWrite*(state: SendStreamState) {.async.} =
let stream = state.stream.valueOr:
return
stream.switch(newClosedStreamState(state))
state.switch(newClosedStreamState(state))
method closeRead*(stream: SendStreamState) {.async.} =
discard
method onClose*(state: SendStreamState) =
let stream = state.stream.valueOr:
return
stream.switch(newClosedStreamState(state))
state.switch(newClosedStreamState(state))
method isClosed*(state: SendStreamState): bool =
false
@@ -59,6 +50,4 @@ method receive*(state: SendStreamState, offset: uint64, bytes: seq[byte], isFin:
discard
method reset*(state: SendStreamState) =
let stream = state.stream.valueOr:
return
stream.switch(newClosedStreamState(state, wasReset = true))
state.switch(newClosedStreamState(state, wasReset = true))