mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 08:08:03 -05:00
Compare commits
2 Commits
pwhite/mas
...
async-fix
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
205a17d8a0 | ||
|
|
7d9a2cef69 |
@@ -358,16 +358,23 @@ proc clearSendPriorityQueue(p: PubSubPeer) =
|
||||
value = p.rpcmessagequeue.sendPriorityQueue.len.int64, labelValues = [$p.peerId]
|
||||
)
|
||||
|
||||
proc sendMsgContinue(conn: Connection, msgFut: Future[void]) {.async: (raises: []).} =
|
||||
proc sendMsgContinue(
|
||||
conn: Connection, msgFut: Future[void]
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
# Continuation for a pending `sendMsg` future from below
|
||||
#
|
||||
# conn.close() in exceptions will clean up the send connection. Next time conn is used,
|
||||
# it will be have its close flag set and thus will be recycled.
|
||||
|
||||
try:
|
||||
await msgFut
|
||||
trace "sent pubsub message to remote", conn
|
||||
except CancelledError as exc:
|
||||
await conn.close()
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "Unexpected exception in sendMsgContinue", conn, description = exc.msg
|
||||
# Next time sendConn is used, it will be have its close flag set and thus
|
||||
# will be recycled
|
||||
await conn.close() # This will clean up the send connection
|
||||
await conn.close()
|
||||
|
||||
proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async: (raises: [CancelledError]).} =
|
||||
# Slow path of `sendMsg` where msg is held in memory while send connection is
|
||||
@@ -387,7 +394,7 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async: (raises: [CancelledErro
|
||||
|
||||
proc sendMsg(
|
||||
p: PubSubPeer, msg: seq[byte], useCustomConn: bool = false
|
||||
): Future[void] {.async: (raises: []).} =
|
||||
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
type ConnectionType = enum
|
||||
ctCustom
|
||||
ctSend
|
||||
@@ -407,17 +414,15 @@ proc sendMsg(
|
||||
slowPath = true
|
||||
(nil, ctSlow)
|
||||
|
||||
if not slowPath:
|
||||
trace "sending encoded msg to peer",
|
||||
conntype = $connType, conn = conn, encoded = shortLog(msg)
|
||||
let f = conn.writeLp(msg)
|
||||
if not f.completed():
|
||||
sendMsgContinue(conn, f)
|
||||
else:
|
||||
f
|
||||
else:
|
||||
if slowPath:
|
||||
trace "sending encoded msg to peer via slow path"
|
||||
sendMsgSlow(p, msg)
|
||||
await sendMsgSlow(p, msg)
|
||||
return
|
||||
|
||||
trace "sending encoded msg to peer",
|
||||
conntype = $connType, conn = conn, encoded = shortLog(msg)
|
||||
|
||||
await sendMsgContinue(conn, conn.writeLp(msg))
|
||||
|
||||
proc sendEncoded*(
|
||||
p: PubSubPeer, msg: seq[byte], isHighPriority: bool, useCustomConn: bool = false
|
||||
@@ -555,7 +560,9 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
|
||||
return true
|
||||
return false
|
||||
|
||||
proc sendNonPriorityTask(p: PubSubPeer) {.async: (raises: [CancelledError]).} =
|
||||
proc sendNonPriorityTask(
|
||||
p: PubSubPeer
|
||||
) {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
while true:
|
||||
# we send non-priority messages only if there are no pending priority messages
|
||||
let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
|
||||
|
||||
@@ -195,7 +195,9 @@ proc generateNodes*(
|
||||
): seq[PubSub] =
|
||||
for i in 0 ..< num:
|
||||
let switch = newStandardSwitch(
|
||||
secureManagers = secureManagers, sendSignedPeerRecord = sendSignedPeerRecord
|
||||
secureManagers = secureManagers,
|
||||
sendSignedPeerRecord = sendSignedPeerRecord,
|
||||
transport = TransportType.QUIC,
|
||||
)
|
||||
let pubsub =
|
||||
if gossip:
|
||||
|
||||
Reference in New Issue
Block a user