mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 12:58:05 -05:00
Compare commits
17 Commits
fix-exc-iv
...
drop-old-r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6484d3fce4 | ||
|
|
a04f8d2757 | ||
|
|
5d9478b0ec | ||
|
|
f6775d2843 | ||
|
|
0d0ef3519f | ||
|
|
a10b8af737 | ||
|
|
b16ec00327 | ||
|
|
df0b98bfdd | ||
|
|
451637a644 | ||
|
|
20a8e57262 | ||
|
|
a1f3940c06 | ||
|
|
14d1787de8 | ||
|
|
07cab432ba | ||
|
|
0317d589ce | ||
|
|
4158849521 | ||
|
|
362c94bf34 | ||
|
|
cbe70da155 |
@@ -83,7 +83,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
|
||||
enablePX: false,
|
||||
bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps
|
||||
overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]),
|
||||
disconnectPeerAboveRateLimit: false
|
||||
disconnectPeerAboveRateLimit: false,
|
||||
maxDurationInNonPriorityQueue: Opt.none(Duration),
|
||||
)
|
||||
|
||||
proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
|
||||
@@ -220,6 +221,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
|
||||
for topic, info in stats[].topicInfos.mpairs:
|
||||
info.firstMessageDeliveries = 0
|
||||
|
||||
pubSubPeer.stopSendNonPriorityTask()
|
||||
|
||||
procCall FloodSub(g).unsubscribePeer(peer)
|
||||
|
||||
proc handleSubscribe*(g: GossipSub,
|
||||
@@ -279,12 +282,28 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||
respControl.prune.add(g.handleGraft(peer, control.graft))
|
||||
let messages = g.handleIWant(peer, control.iwant)
|
||||
|
||||
if
|
||||
respControl.prune.len > 0 or
|
||||
respControl.iwant.len > 0 or
|
||||
messages.len > 0:
|
||||
# iwant and prunes from here, also messages
|
||||
let
|
||||
isPruneNotEmpty = respControl.prune.len > 0
|
||||
isIWantNotEmpty = respControl.iwant.len > 0
|
||||
|
||||
if isPruneNotEmpty or isIWantNotEmpty:
|
||||
|
||||
if isIWantNotEmpty:
|
||||
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
|
||||
|
||||
if isPruneNotEmpty:
|
||||
for prune in respControl.prune:
|
||||
if g.knownTopics.contains(prune.topicId):
|
||||
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId])
|
||||
else:
|
||||
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
|
||||
|
||||
trace "sending control message", msg = shortLog(respControl), peer
|
||||
g.send(
|
||||
peer,
|
||||
RPCMsg(control: some(respControl)), true)
|
||||
|
||||
if messages.len > 0:
|
||||
for smsg in messages:
|
||||
for topic in smsg.topicIds:
|
||||
if g.knownTopics.contains(topic):
|
||||
@@ -292,18 +311,11 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||
else:
|
||||
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])
|
||||
|
||||
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
|
||||
|
||||
for prune in respControl.prune:
|
||||
if g.knownTopics.contains(prune.topicId):
|
||||
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId])
|
||||
else:
|
||||
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
|
||||
|
||||
trace "sending control message", msg = shortLog(respControl), peer
|
||||
# iwant replies have lower priority
|
||||
trace "sending iwant reply messages", peer
|
||||
g.send(
|
||||
peer,
|
||||
RPCMsg(control: some(respControl), messages: messages))
|
||||
RPCMsg(messages: messages), false)
|
||||
|
||||
proc validateAndRelay(g: GossipSub,
|
||||
msg: Message,
|
||||
@@ -370,7 +382,7 @@ proc validateAndRelay(g: GossipSub,
|
||||
|
||||
# In theory, if topics are the same in all messages, we could batch - we'd
|
||||
# also have to be careful to only include validated messages
|
||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
|
||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), false)
|
||||
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
|
||||
for topic in msg.topicIds:
|
||||
if topic notin g.topics: continue
|
||||
@@ -441,7 +453,7 @@ method rpcHandler*(g: GossipSub,
|
||||
peer.recvObservers(rpcMsg)
|
||||
|
||||
if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
|
||||
g.send(peer, RPCMsg(pong: rpcMsg.ping))
|
||||
g.send(peer, RPCMsg(pong: rpcMsg.ping), true)
|
||||
peer.pingBudget.dec
|
||||
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
|
||||
template sub: untyped = rpcMsg.subscriptions[i]
|
||||
@@ -655,7 +667,7 @@ method publish*(g: GossipSub,
|
||||
|
||||
g.mcache.put(msgId, msg)
|
||||
|
||||
g.broadcast(peers, RPCMsg(messages: @[msg]))
|
||||
g.broadcast(peers, RPCMsg(messages: @[msg]), true)
|
||||
|
||||
if g.knownTopics.contains(topic):
|
||||
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])
|
||||
@@ -740,4 +752,5 @@ method getOrCreatePeer*(
|
||||
let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos)
|
||||
g.parameters.overheadRateLimit.withValue(overheadRateLimit):
|
||||
peer.overheadRateLimitOpt = Opt.some(TokenBucket.new(overheadRateLimit.bytes, overheadRateLimit.interval))
|
||||
peer.rpcmessagequeue.maxDurationInNonPriorityQueue = g.parameters.maxDurationInNonPriorityQueue
|
||||
return peer
|
||||
|
||||
@@ -147,6 +147,10 @@ type
|
||||
overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]]
|
||||
disconnectPeerAboveRateLimit*: bool
|
||||
|
||||
# The maximum duration a message can stay in the non-priority queue. If it exceeds this duration, it will be discarded
|
||||
# as soon as it is dequeued, instead of being sent to the remote peer. The default value is none, i.e., no maximum duration.
|
||||
maxDurationInNonPriorityQueue*: Opt[Duration]
|
||||
|
||||
BackoffTable* = Table[string, Table[PeerId, Moment]]
|
||||
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]
|
||||
|
||||
|
||||
@@ -138,17 +138,18 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =
|
||||
|
||||
libp2p_pubsub_peers.set(p.peers.len.int64)
|
||||
|
||||
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [].} =
|
||||
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool = false) {.raises: [].} =
|
||||
## Attempt to send `msg` to remote peer
|
||||
##
|
||||
|
||||
trace "sending pubsub message to peer", peer, msg = shortLog(msg)
|
||||
peer.send(msg, p.anonymize)
|
||||
asyncSpawn peer.send(msg, p.anonymize, isHighPriority)
|
||||
|
||||
proc broadcast*(
|
||||
p: PubSub,
|
||||
sendPeers: auto, # Iteratble[PubSubPeer]
|
||||
msg: RPCMsg) {.raises: [].} =
|
||||
msg: RPCMsg,
|
||||
isHighPriority: bool = false) {.raises: [].} =
|
||||
## Attempt to send `msg` to the given peers
|
||||
|
||||
let npeers = sendPeers.len.int64
|
||||
@@ -195,12 +196,12 @@ proc broadcast*(
|
||||
|
||||
if anyIt(sendPeers, it.hasObservers):
|
||||
for peer in sendPeers:
|
||||
p.send(peer, msg)
|
||||
p.send(peer, msg, isHighPriority)
|
||||
else:
|
||||
# Fast path that only encodes message once
|
||||
let encoded = encodeRpcMsg(msg, p.anonymize)
|
||||
for peer in sendPeers:
|
||||
asyncSpawn peer.sendEncoded(encoded)
|
||||
asyncSpawn peer.sendEncoded(encoded, isHighPriority)
|
||||
|
||||
proc sendSubs*(p: PubSub,
|
||||
peer: PubSubPeer,
|
||||
|
||||
@@ -31,6 +31,12 @@ when defined(libp2p_expensive_metrics):
|
||||
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
|
||||
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
|
||||
|
||||
declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"])
|
||||
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"])
|
||||
|
||||
declareCounter(libp2p_gossipsub_non_priority_msgs_dropped, "the number of dropped messages in the non-priority queue", labels = ["id"])
|
||||
|
||||
|
||||
type
|
||||
PeerRateLimitError* = object of CatchableError
|
||||
|
||||
@@ -49,6 +55,20 @@ type
|
||||
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
|
||||
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}
|
||||
|
||||
QueuedMessage* = object
|
||||
msg*: seq[byte]
|
||||
addedAt*: Moment
|
||||
|
||||
RpcMessageQueue* = ref object
|
||||
# Tracks async tasks for sending high-priority peer-published messages.
|
||||
sendPriorityQueue: Deque[Future[void]]
|
||||
# Queue for lower-priority messages, like "IWANT" replies and relay messages.
|
||||
nonPriorityQueue: AsyncQueue[QueuedMessage]
|
||||
# Task for processing non-priority message queue.
|
||||
sendNonPriorityTask: Future[void]
|
||||
# The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms.
|
||||
maxDurationInNonPriorityQueue*: Opt[Duration]
|
||||
|
||||
PubSubPeer* = ref object of RootObj
|
||||
getConn*: GetConn # callback to establish a new send connection
|
||||
onEvent*: OnEvent # Connectivity updates for peer
|
||||
@@ -70,6 +90,8 @@ type
|
||||
behaviourPenalty*: float64 # the eventual penalty score
|
||||
overheadRateLimitOpt*: Opt[TokenBucket]
|
||||
|
||||
rpcmessagequeue*: RpcMessageQueue
|
||||
|
||||
RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
|
||||
{.gcsafe, raises: [].}
|
||||
|
||||
@@ -82,6 +104,16 @@ when defined(libp2p_agents_metrics):
|
||||
#so we have to read the parents short agent..
|
||||
p.sendConn.getWrapped().shortAgent
|
||||
|
||||
proc getAgent*(peer: PubSubPeer): string =
|
||||
return
|
||||
when defined(libp2p_agents_metrics):
|
||||
if peer.shortAgent.len > 0:
|
||||
peer.shortAgent
|
||||
else:
|
||||
"unknown"
|
||||
else:
|
||||
"unknown"
|
||||
|
||||
func hash*(p: PubSubPeer): Hash =
|
||||
p.peerId.hash
|
||||
|
||||
@@ -227,17 +259,13 @@ template sendMetrics(msg: RPCMsg): untyped =
|
||||
# metrics
|
||||
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
|
||||
|
||||
proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
||||
doAssert(not isNil(p), "pubsubpeer nil!")
|
||||
|
||||
if msg.len <= 0:
|
||||
debug "empty message, skipping", p, msg = shortLog(msg)
|
||||
return
|
||||
|
||||
if msg.len > p.maxMessageSize:
|
||||
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
|
||||
return
|
||||
proc clearSendPriorityQueue(p: PubSubPeer) =
|
||||
while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished:
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
|
||||
discard p.rpcmessagequeue.sendPriorityQueue.popFirst()
|
||||
|
||||
proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
||||
if p.sendConn == nil:
|
||||
# Wait for a send conn to be setup. `connectOnce` will
|
||||
# complete this even if the sendConn setup failed
|
||||
@@ -262,6 +290,30 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
||||
|
||||
await conn.close() # This will clean up the send connection
|
||||
|
||||
proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {.async.} =
|
||||
doAssert(not isNil(p), "pubsubpeer nil!")
|
||||
|
||||
if msg.len <= 0:
|
||||
debug "empty message, skipping", p, msg = shortLog(msg)
|
||||
return
|
||||
|
||||
if msg.len > p.maxMessageSize:
|
||||
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
|
||||
return
|
||||
|
||||
if isHighPriority:
|
||||
p.clearSendPriorityQueue()
|
||||
let f = p.sendMsg(msg)
|
||||
if not f.finished:
|
||||
p.rpcmessagequeue.sendPriorityQueue.addLast(f)
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
|
||||
else:
|
||||
await p.rpcmessagequeue.nonPriorityQueue.addLast(QueuedMessage(msg: msg, addedAt: Moment.now()))
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
|
||||
trace "message queued", p, msg = shortLog(msg)
|
||||
|
||||
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
|
||||
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
|
||||
## Each new `RPCMsg` accumulates Messages until reaching the specified `maxSize`. If a single Message
|
||||
@@ -297,7 +349,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
|
||||
else:
|
||||
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||
|
||||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
|
||||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = false) {.async.} =
|
||||
# When sending messages, we take care to re-encode them with the right
|
||||
# anonymization flag to ensure that we're not penalized for sending invalid
|
||||
# or malicious data on the wire - in particular, re-encoding protects against
|
||||
@@ -317,11 +369,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
|
||||
|
||||
if encoded.len > p.maxMessageSize and msg.messages.len > 1:
|
||||
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
|
||||
asyncSpawn p.sendEncoded(encodedSplitMsg)
|
||||
await p.sendEncoded(encodedSplitMsg, isHighPriority)
|
||||
else:
|
||||
# If the message size is within limits, send it as is
|
||||
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
|
||||
asyncSpawn p.sendEncoded(encoded)
|
||||
await p.sendEncoded(encoded, isHighPriority)
|
||||
|
||||
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
|
||||
for sentIHave in p.sentIHaves.mitems():
|
||||
@@ -330,6 +382,49 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
|
||||
return true
|
||||
return false
|
||||
|
||||
proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
|
||||
while true:
|
||||
# we send non-priority messages only if there are no pending priority messages
|
||||
let queuedMsg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
|
||||
while p.rpcmessagequeue.sendPriorityQueue.len > 0:
|
||||
p.clearSendPriorityQueue()
|
||||
# this minimizes the number of times we have to wait for something (each wait = performance cost)
|
||||
# we will never wait for a finished future and by waiting for the last one, all that come before it are guaranteed
|
||||
# to be finished already (since sends are processed in order).
|
||||
if p.rpcmessagequeue.sendPriorityQueue.len > 0:
|
||||
await p.rpcmessagequeue.sendPriorityQueue[^1]
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
|
||||
p.rpcmessagequeue.maxDurationInNonPriorityQueue.withValue(maxDurationInNonPriorityQueue):
|
||||
if Moment.now() - queuedMsg.addedAt >= maxDurationInNonPriorityQueue:
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId])
|
||||
continue
|
||||
await p.sendMsg(queuedMsg.msg)
|
||||
|
||||
proc startSendNonPriorityTask(p: PubSubPeer) =
|
||||
debug "starting sendNonPriorityTask", p
|
||||
if p.rpcmessagequeue.sendNonPriorityTask.isNil:
|
||||
p.rpcmessagequeue.sendNonPriorityTask = p.sendNonPriorityTask()
|
||||
|
||||
proc stopSendNonPriorityTask*(p: PubSubPeer) =
|
||||
if not p.rpcmessagequeue.sendNonPriorityTask.isNil:
|
||||
debug "stopping sendNonPriorityTask", p
|
||||
p.rpcmessagequeue.sendNonPriorityTask.cancel()
|
||||
p.rpcmessagequeue.sendNonPriorityTask = nil
|
||||
p.rpcmessagequeue.sendPriorityQueue.clear()
|
||||
p.rpcmessagequeue.nonPriorityQueue.clear()
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
|
||||
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
|
||||
|
||||
proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = Opt.none(Duration)): T =
|
||||
return T(
|
||||
sendPriorityQueue: initDeque[Future[void]](),
|
||||
nonPriorityQueue: newAsyncQueue[QueuedMessage](),
|
||||
maxDurationInNonPriorityQueue: maxDurationInNonPriorityQueue,
|
||||
)
|
||||
|
||||
proc new*(
|
||||
T: typedesc[PubSubPeer],
|
||||
peerId: PeerId,
|
||||
@@ -337,7 +432,8 @@ proc new*(
|
||||
onEvent: OnEvent,
|
||||
codec: string,
|
||||
maxMessageSize: int,
|
||||
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T =
|
||||
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket),
|
||||
maxDurationInNonPriorityQueue = Opt.none(Duration)): T =
|
||||
|
||||
result = T(
|
||||
getConn: getConn,
|
||||
@@ -346,17 +442,9 @@ proc new*(
|
||||
peerId: peerId,
|
||||
connectedFut: newFuture[void](),
|
||||
maxMessageSize: maxMessageSize,
|
||||
overheadRateLimitOpt: overheadRateLimitOpt
|
||||
overheadRateLimitOpt: overheadRateLimitOpt,
|
||||
rpcmessagequeue: RpcMessageQueue.new(maxDurationInNonPriorityQueue),
|
||||
)
|
||||
result.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||
result.heDontWants.addFirst(default(HashSet[MessageId]))
|
||||
|
||||
proc getAgent*(peer: PubSubPeer): string =
|
||||
return
|
||||
when defined(libp2p_agents_metrics):
|
||||
if peer.shortAgent.len > 0:
|
||||
peer.shortAgent
|
||||
else:
|
||||
"unknown"
|
||||
else:
|
||||
"unknown"
|
||||
result.startSendNonPriorityTask()
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
|
||||
{.used.}
|
||||
|
||||
import std/[options, deques, sequtils, enumerate, algorithm]
|
||||
import std/[options, deques, sequtils, enumerate, algorithm, os]
|
||||
import stew/byteutils
|
||||
import ../../libp2p/builders
|
||||
import ../../libp2p/errors
|
||||
@@ -718,15 +718,19 @@ suite "GossipSub internal":
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
proc setupTest(): Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} =
|
||||
proc setupTest(maxDurationInNonPriorityQueue1: Opt[Duration] = Opt.none(Duration)):
|
||||
Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} =
|
||||
let
|
||||
nodes = generateNodes(2, gossip = true, verifySignature = false)
|
||||
discard await allFinished(
|
||||
nodes[0].switch.start(),
|
||||
nodes[1].switch.start()
|
||||
)
|
||||
var gossip0: GossipSub = GossipSub(nodes[0])
|
||||
var gossip1: GossipSub = GossipSub(nodes[1])
|
||||
|
||||
await nodes[1].switch.connect(nodes[0].switch.peerInfo.peerId, nodes[0].switch.peerInfo.addrs)
|
||||
gossip1.parameters.maxDurationInNonPriorityQueue = maxDurationInNonPriorityQueue1
|
||||
await gossip1.switch.connect(gossip0.switch.peerInfo.peerId, gossip0.switch.peerInfo.addrs)
|
||||
|
||||
var receivedMessages = new(HashSet[seq[byte]])
|
||||
|
||||
@@ -736,12 +740,10 @@ suite "GossipSub internal":
|
||||
proc handlerB(topic: string, data: seq[byte]) {.async.} =
|
||||
discard
|
||||
|
||||
nodes[0].subscribe("foobar", handlerA)
|
||||
nodes[1].subscribe("foobar", handlerB)
|
||||
gossip0.subscribe("foobar", handlerA)
|
||||
gossip1.subscribe("foobar", handlerB)
|
||||
await waitSubGraph(nodes, "foobar")
|
||||
|
||||
var gossip0: GossipSub = GossipSub(nodes[0])
|
||||
var gossip1: GossipSub = GossipSub(nodes[1])
|
||||
|
||||
return (gossip0, gossip1, receivedMessages)
|
||||
|
||||
@@ -844,3 +846,18 @@ suite "GossipSub internal":
|
||||
check receivedMessages[].len == 1
|
||||
|
||||
await teardownTest(gossip0, gossip1)
|
||||
|
||||
asyncTest "e2e - drop msg if it is in the non-priority queue for too long":
|
||||
# This test checks if two messages, both below the maxSize, are correctly processed and sent.
|
||||
# Expected: Both messages should be received.
|
||||
let maxDurationInNonPriorityQueueGossip1 = 100.millis
|
||||
let (gossip0, gossip1, receivedMessages) = await setupTest(Opt.some(maxDurationInNonPriorityQueueGossip1))
|
||||
|
||||
let topic = "foobar"
|
||||
gossip1.broadcast(gossip1.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]), false)
|
||||
sleep(100) # pause all tasks to ensure that the message stay in the non-priority queue longer than maxDurationInNonPriorityQueueGossip1
|
||||
gossip1.broadcast(gossip1.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](36))]), false)
|
||||
await sleepAsync(100.milliseconds) # wait for the messages to be processed
|
||||
check: receivedMessages[].len == 1 # only the second message should be received
|
||||
|
||||
await teardownTest(gossip0, gossip1)
|
||||
Reference in New Issue
Block a user