Compare commits

...

2 Commits

Author SHA1 Message Date
Vlado Pajić
205a17d8a0 utilize quic 2025-09-09 16:39:58 +02:00
Vlado Pajić
7d9a2cef69 fix async when sending message 2025-09-09 16:39:57 +02:00
2 changed files with 26 additions and 17 deletions

View File

@@ -358,16 +358,23 @@ proc clearSendPriorityQueue(p: PubSubPeer) =
value = p.rpcmessagequeue.sendPriorityQueue.len.int64, labelValues = [$p.peerId]
)
proc sendMsgContinue(conn: Connection, msgFut: Future[void]) {.async: (raises: []).} =
proc sendMsgContinue(
conn: Connection, msgFut: Future[void]
) {.async: (raises: [CancelledError]).} =
# Continuation for a pending `sendMsg` future from below
#
# conn.close() in exceptions will clean up the send connection. Next time conn is used,
# it will be have its close flag set and thus will be recycled.
try:
await msgFut
trace "sent pubsub message to remote", conn
except CancelledError as exc:
await conn.close()
raise exc
except CatchableError as exc:
trace "Unexpected exception in sendMsgContinue", conn, description = exc.msg
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled
await conn.close() # This will clean up the send connection
await conn.close()
proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async: (raises: [CancelledError]).} =
# Slow path of `sendMsg` where msg is held in memory while send connection is
@@ -387,7 +394,7 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async: (raises: [CancelledErro
proc sendMsg(
p: PubSubPeer, msg: seq[byte], useCustomConn: bool = false
): Future[void] {.async: (raises: []).} =
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
type ConnectionType = enum
ctCustom
ctSend
@@ -407,17 +414,15 @@ proc sendMsg(
slowPath = true
(nil, ctSlow)
if not slowPath:
trace "sending encoded msg to peer",
conntype = $connType, conn = conn, encoded = shortLog(msg)
let f = conn.writeLp(msg)
if not f.completed():
sendMsgContinue(conn, f)
else:
f
else:
if slowPath:
trace "sending encoded msg to peer via slow path"
sendMsgSlow(p, msg)
await sendMsgSlow(p, msg)
return
trace "sending encoded msg to peer",
conntype = $connType, conn = conn, encoded = shortLog(msg)
await sendMsgContinue(conn, conn.writeLp(msg))
proc sendEncoded*(
p: PubSubPeer, msg: seq[byte], isHighPriority: bool, useCustomConn: bool = false
@@ -555,7 +560,9 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
return true
return false
proc sendNonPriorityTask(p: PubSubPeer) {.async: (raises: [CancelledError]).} =
proc sendNonPriorityTask(
p: PubSubPeer
) {.async: (raises: [CancelledError, LPStreamError]).} =
while true:
# we send non-priority messages only if there are no pending priority messages
let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()

View File

@@ -195,7 +195,9 @@ proc generateNodes*(
): seq[PubSub] =
for i in 0 ..< num:
let switch = newStandardSwitch(
secureManagers = secureManagers, sendSignedPeerRecord = sendSignedPeerRecord
secureManagers = secureManagers,
sendSignedPeerRecord = sendSignedPeerRecord,
transport = TransportType.QUIC,
)
let pubsub =
if gossip: