fix: blocked streams (#167)

This commit is contained in:
richΛrd
2025-10-23 15:41:19 -04:00
committed by GitHub
parent 78ab34c68a
commit 20d48c29f4
4 changed files with 51 additions and 3 deletions

View File

@@ -25,6 +25,10 @@ export Ngtcp2Connection
proc destroy*(connection: Ngtcp2Connection) =
let conn = connection.conn.valueOr:
return
for blockedFut in connection.blockedStreams.values():
blockedFut.cancelSoon()
connection.expiryTimer.stop()
ngtcp2_conn_del(conn)
dealloc(connection.connref)
@@ -72,6 +76,25 @@ proc updateExpiryTimer*(connection: Ngtcp2Connection) =
else:
connection.expiryTimer.stop()
proc waitUntilUnblocked(
connection: Ngtcp2Connection, streamId: int64
) {.async: (raises: [CancelledError]).} =
if not connection.blockedStreams.hasKey(streamId):
return
try:
await connection.blockedStreams[streamId]
except KeyError:
raiseAssert "checked with hasKey"
proc extendMaxStreamData*(connection: Ngtcp2Connection, streamId: int64) =
## Unblocks any stream that might have been blocked due to flow control
try:
if connection.blockedStreams.hasKey(streamId):
connection.blockedStreams[streamId].complete()
connection.blockedStreams.del(streamId)
except KeyError:
raiseAssert "checked with hasKey"
proc trySend(
connection: Ngtcp2Connection,
buffer: var seq[byte],
@@ -101,6 +124,12 @@ proc trySend(
messageLen,
now(),
)
if length.int == NGTCP2_ERR_STREAM_DATA_BLOCKED:
connection.blockedStreams[streamId] =
Future[void].Raising([CancelledError]).init("StreamLatch")
return Datagram()
checkResult length.cint
if length == 0:
@@ -130,6 +159,10 @@ proc send(
messageLen: uint,
isFin: bool = false,
): Future[int] {.async: (raises: [CancelledError, QuicError]).} =
# Stream might be blocked, waiting in case there are multiple
# async ops trying to write to same stream
await connection.waitUntilUnblocked(streamId)
var written: int
var buffer = newSeqUninit[byte](writeBufferSize)
var datagram =
@@ -145,6 +178,7 @@ proc send(
while datagram.data.len == 0:
connection.flowing.clear()
await connection.flowing.wait()
await connection.waitUntilUnblocked(streamId)
datagram =
connection.trySend(buffer, streamId, messagePtr, messageLen, addr written, isFin)
@@ -255,6 +289,7 @@ proc close*(connection: Ngtcp2Connection): Datagram =
checkResult length.cint
buffer.setLen(length)
let ecn = ECN(packetInfo.ecn)
Datagram(data: buffer, ecn: ecn)
# TODO: should stop all event loops

View File

@@ -108,6 +108,17 @@ proc onAckedStreamDataOffset(
connection.ackSentBytes(stream_id, offset, datalen)
return 0
proc onExtendMaxStreamData(
conn: ptr ngtcp2_conn,
stream_id: int64,
max_data: uint64,
user_data: pointer,
stream_user_data: pointer,
): cint {.cdecl.} =
trace "onExtendMaxStreamData"
let connection = cast[Ngtcp2Connection](user_data)
connection.extendMaxStreamData(stream_id)
proc installStreamCallbacks*(callbacks: var ngtcp2_callbacks) =
callbacks.stream_open = onStreamOpen
callbacks.stream_close = onStreamClose
@@ -115,3 +126,4 @@ proc installStreamCallbacks*(callbacks: var ngtcp2_callbacks) =
callbacks.stream_reset = onStreamReset
callbacks.stream_stop_sending = onStreamStopSending
callbacks.acked_stream_data_offset = onAckedStreamDataOffset
callbacks.extend_max_stream_data = onExtendMaxStreamData

View File

@@ -23,6 +23,7 @@ type
ssl*: ptr SSL
connref*: ptr ngtcp2_crypto_conn_ref
pendingAckQueues*: Table[int64, PendingAckQueue]
blockedStreams*: Table[int64, Future[void].Raising([CancelledError])]
path*: Path
rng*: ref HmacDrbgContext

View File

@@ -24,7 +24,7 @@ template deferStop(listener: Listener) =
suite "Quic integration usecases":
test "client to server send and receive message":
let message = newData(50 * 1024)
let message = newData(1024 * 1024)
let address = initTAddress("127.0.0.1:12345")
proc outgoing() {.async.} =
@@ -56,7 +56,7 @@ suite "Quic integration usecases":
asyncTest "connect many clients to single server":
const connectionsCount = 2 # should be increased when bug is fixed
const msgSize = 50 * 1024
const msgSize = 1024 * 1024
let serverWg = newWaitGroup(connectionsCount)
let clientWg = newWaitGroup(connectionsCount)
let address = initTAddress("127.0.0.1:12345")
@@ -94,7 +94,7 @@ suite "Quic integration usecases":
asyncTest "connections with many streams":
const connectionsCount = 3
const streamsCount = 20
const msgSize = 50 * 1024
const msgSize = 1024 * 1024
let serverWg = newWaitGroup(connectionsCount)
let clientWg = newWaitGroup(connectionsCount)
let address = initTAddress("127.0.0.1:12345")