From 86695b55bb7fd100560da99b94e78fc33786648c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Kami=C5=84ski?= Date: Thu, 29 May 2025 12:44:21 +0100 Subject: [PATCH] test(gossipsub): include missing test files and handle flaky tests (#1416) Co-authored-by: vladopajic --- tests/helpers.nim | 8 +- tests/pubsub/testgossipsubcontrolmessages.nim | 3 +- tests/pubsub/testgossipsubgossip.nim | 15 +- tests/pubsub/testgossipsubheartbeat.nim | 166 +++++++++--------- tests/pubsub/testgossipsubmeshmanagement.nim | 67 +++---- tests/pubsub/testgossipsubmessagehandling.nim | 4 +- tests/pubsub/testpubsub.nim | 6 +- tests/pubsub/utils.nim | 45 ++--- tests/testhelpers.nim | 4 +- 9 files changed, 144 insertions(+), 174 deletions(-) diff --git a/tests/helpers.nim b/tests/helpers.nim index 97e835e3e..cf7dfd1c1 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -94,7 +94,9 @@ proc new*(T: typedesc[TestBufferStream], writeHandler: WriteHandler): T = testBufferStream.initStream() testBufferStream -macro checkUntilCustomTimeout*(timeout: Duration, code: untyped): untyped = +macro checkUntilCustomTimeout*( + timeout: Duration, sleepInterval: Duration, code: untyped +): untyped = ## Periodically checks a given condition until it is true or a timeout occurs. ## ## `code`: untyped - A condition expression that should eventually evaluate to true. @@ -147,7 +149,7 @@ macro checkUntilCustomTimeout*(timeout: Duration, code: untyped): untyped = if `combinedBoolExpr`: return else: - await sleepAsync(100.millis) + await sleepAsync(`sleepInterval`) await checkExpiringInternal() @@ -173,7 +175,7 @@ macro checkUntilTimeout*(code: untyped): untyped = ## b == 1 ## ``` result = quote: - checkUntilCustomTimeout(10.seconds, `code`) + checkUntilCustomTimeout(10.seconds, 100.milliseconds, `code`) proc unorderedCompare*[T](a, b: seq[T]): bool = if a == b: diff --git a/tests/pubsub/testgossipsubcontrolmessages.nim b/tests/pubsub/testgossipsubcontrolmessages.nim index 3066c3b72..d5feae0c3 100644 --- a/tests/pubsub/testgossipsubcontrolmessages.nim +++ b/tests/pubsub/testgossipsubcontrolmessages.nim @@ -1,11 +1,10 @@ {.used.} import std/[sequtils] -import stew/byteutils import utils import chronicles import ../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable] -import ../helpers, ../utils/[futures] +import ../helpers suite "GossipSub Control Messages": teardown: diff --git a/tests/pubsub/testgossipsubgossip.nim b/tests/pubsub/testgossipsubgossip.nim index c5b53df99..765979afc 100644 --- a/tests/pubsub/testgossipsubgossip.nim +++ b/tests/pubsub/testgossipsubgossip.nim @@ -141,6 +141,7 @@ suite "GossipSub Gossip Protocol": topic = "foobar" dValues = DValues(dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1)) nodes = generateNodes(numberOfNodes, gossip = true, dValues = some(dValues)) + .toGossipSub() startNodesAndDeferStop(nodes) @@ -152,19 +153,17 @@ suite "GossipSub Gossip Protocol": # And subscribed to the same topic subscribeAllNodes(nodes, topic, voidTopicHandler) - await waitForPeersInTable( - nodes, topic, newSeqWith(numberOfNodes, 4), PeerTableType.Gossipsub - ) + + checkUntilCustomTimeout(500.milliseconds, 20.milliseconds): + nodes.allIt(it.gossipsub.getOrDefault(topic).len == numberOfNodes - 1) # When node 0 sends a message - check (await nodes[0].publish(topic, "Hello!".toBytes())) > 0 - await waitForHeartbeat() + tryPublish await nodes[0].publish(topic, "Hello!".toBytes()), 1 # At least one of the nodes should have received an iHave message # The check is made this way because the mesh structure changes from run to run - let receivedIHaves = messages[].mapIt(it[].len) - check: - anyIt(receivedIHaves, it > 0) + checkUntilCustomTimeout(500.milliseconds, 20.milliseconds): + messages[].mapIt(it[].len).anyIt(it > 0) asyncTest "adaptive gossip dissemination, dLazy and gossipFactor to 0": let diff --git a/tests/pubsub/testgossipsubheartbeat.nim b/tests/pubsub/testgossipsubheartbeat.nim index ced7e2aba..564dffd58 100644 --- a/tests/pubsub/testgossipsubheartbeat.nim +++ b/tests/pubsub/testgossipsubheartbeat.nim @@ -9,34 +9,31 @@ suite "GossipSub Heartbeat": teardown: checkTrackers() + const + timeout = 1.seconds + interval = 50.milliseconds + asyncTest "Mesh is rebalanced during heartbeat - pruning peers": const numberOfNodes = 10 topic = "foobar" + heartbeatInterval = 200.milliseconds let - nodes = generateNodes(numberOfNodes, gossip = true).toGossipSub() + nodes = generateNodes( + numberOfNodes, gossip = true, heartbeatInterval = heartbeatInterval + ) + .toGossipSub() node0 = nodes[0] startNodesAndDeferStop(nodes) - # Observe timestamps of received prune messages - var lastPrune = Moment.now() - for i in 0 ..< numberOfNodes: - let pubsubObserver = PubSubObserver( - onRecv: proc(peer: PubSubPeer, msgs: var RPCMsg) = - if msgs.control.isSome: - for msg in msgs.control.get.prune: - lastPrune = Moment.now() - ) - nodes[i].addObserver(pubsubObserver) - # Nodes are connected to Node0 for i in 1 ..< numberOfNodes: await connectNodes(node0, nodes[i]) subscribeAllNodes(nodes, topic, voidTopicHandler) - await waitForHeartbeat() - check: - node0.mesh[topic].len == numberOfNodes - 1 + + checkUntilCustomTimeout(timeout, interval): + node0.mesh.getOrDefault(topic).len == numberOfNodes - 1 # When DValues of Node0 are updated to lower than defaults const @@ -54,21 +51,9 @@ suite "GossipSub Heartbeat": ) node0.parameters.applyDValues(newDValues) - # Waiting 2 hearbeats to finish pruning (2 peers first and then 3) - # Comparing timestamps of last received prune to confirm heartbeat interval - await waitForHeartbeat() - let after1stHearbeat = lastPrune - - await waitForHeartbeat() - let after2ndHearbeat = lastPrune - - let measuredHeartbeat = after2ndHearbeat - after1stHearbeat - let heartbeatDiff = measuredHeartbeat - TEST_GOSSIPSUB_HEARTBEAT_INTERVAL - # Then mesh of Node0 is rebalanced and peers are pruned to adapt to new values - check: + checkUntilCustomTimeout(timeout, interval): node0.mesh[topic].len >= newDLow and node0.mesh[topic].len <= newDHigh - heartbeatDiff < 2.milliseconds # 2ms margin asyncTest "Mesh is rebalanced during heartbeat - grafting new peers": const @@ -76,6 +61,7 @@ suite "GossipSub Heartbeat": topic = "foobar" dLow = 3 dHigh = 4 + heartbeatInterval = 200.milliseconds let nodes = generateNodes( numberOfNodes, @@ -84,6 +70,7 @@ suite "GossipSub Heartbeat": DValues(dLow: some(dLow), dHigh: some(dHigh), d: some(3), dOut: some(1)) ), pruneBackoff = 20.milliseconds, + heartbeatInterval = heartbeatInterval, ) .toGossipSub() node0 = nodes[0] @@ -94,19 +81,16 @@ suite "GossipSub Heartbeat": for i in 1 ..< numberOfNodes: await connectNodes(node0, nodes[i]) subscribeAllNodes(nodes, topic, voidTopicHandler) - await waitForHeartbeat() - check: - node0.mesh[topic].len >= dLow and node0.mesh[topic].len <= dHigh + checkUntilCustomTimeout(timeout, interval): + node0.mesh.getOrDefault(topic).len >= dLow and + node0.mesh.getOrDefault(topic).len <= dHigh # When peers of Node0 mesh are disconnected let peersToDisconnect = node0.mesh[topic].toSeq()[1 .. ^1].mapIt(it.peerId) findAndUnsubscribePeers(nodes, peersToDisconnect, topic, voidTopicHandler) - # Then mesh of Node0 is rebalanced and new peers are added - await waitForHeartbeat() - - check: + checkUntilCustomTimeout(timeout, interval): node0.mesh[topic].len >= dLow and node0.mesh[topic].len <= dHigh node0.mesh[topic].toSeq().allIt(it.peerId notin peersToDisconnect) @@ -114,6 +98,7 @@ suite "GossipSub Heartbeat": const numberOfNodes = 10 topic = "foobar" + heartbeatInterval = 200.milliseconds let nodes = generateNodes( numberOfNodes, @@ -130,6 +115,7 @@ suite "GossipSub Heartbeat": ), pruneBackoff = 20.milliseconds, opportunisticGraftThreshold = 600, + heartbeatInterval = heartbeatInterval, ) .toGossipSub() node0 = nodes[0] @@ -140,7 +126,7 @@ suite "GossipSub Heartbeat": for i in 1 ..< numberOfNodes: await connectNodes(node0, nodes[i]) subscribeAllNodes(nodes, topic, voidTopicHandler) - await waitForHeartbeat() + await waitForHeartbeat(heartbeatInterval) # Keep track of initial mesh of Node0 let startingMesh = node0.mesh[topic].toSeq() @@ -159,7 +145,7 @@ suite "GossipSub Heartbeat": expectedGrafts &= peer # Then during heartbeat Peers with lower than median scores are pruned and max 2 Peers are grafted - await waitForHeartbeat() + await waitForHeartbeat(heartbeatInterval) let actualGrafts = node0.mesh[topic].toSeq().filterIt(it notin startingMesh) const maxOpportunisticGraftsPerHeartbeat = 2 @@ -171,7 +157,13 @@ suite "GossipSub Heartbeat": const numberOfNodes = 10 topic = "foobar" - let nodes = generateNodes(numberOfNodes, gossip = true, fanoutTTL = 30.milliseconds) + heartbeatInterval = 200.milliseconds + let nodes = generateNodes( + numberOfNodes, + gossip = true, + fanoutTTL = 60.milliseconds, + heartbeatInterval = heartbeatInterval, + ) .toGossipSub() startNodesAndDeferStop(nodes) @@ -180,29 +172,31 @@ suite "GossipSub Heartbeat": # All nodes but Node0 are subscribed to the topic for node in nodes[1 .. ^1]: node.subscribe(topic, voidTopicHandler) - await waitForHeartbeat() + await waitForHeartbeat(heartbeatInterval) # When Node0 sends a message to the topic let node0 = nodes[0] - tryPublish await node0.publish(topic, newSeq[byte](10000)), 1 + tryPublish await node0.publish(topic, newSeq[byte](10000)), 3 # Then Node0 fanout peers are populated let maxFanoutPeers = node0.parameters.d - await waitForPeersInTable(node0, topic, maxFanoutPeers, PeerTableType.Fanout) - check: + checkUntilCustomTimeout(timeout, interval): node0.fanout.hasKey(topic) and node0.fanout[topic].len == maxFanoutPeers - # And after heartbeat (60ms) Node0 fanout peers are dropped (because fanoutTTL=30ms) - await waitForHeartbeat() - check: + # And after heartbeat Node0 fanout peers are dropped (because fanoutTTL < heartbeatInterval) + checkUntilCustomTimeout(timeout, interval): not node0.fanout.hasKey(topic) asyncTest "Fanout maintenance during heartbeat - fanout peers are replenished": const numberOfNodes = 10 topic = "foobar" + heartbeatInterval = 200.milliseconds let - nodes = generateNodes(numberOfNodes, gossip = true).toGossipSub() + nodes = generateNodes( + numberOfNodes, gossip = true, heartbeatInterval = heartbeatInterval + ) + .toGossipSub() node0 = nodes[0] startNodesAndDeferStop(nodes) @@ -211,31 +205,38 @@ suite "GossipSub Heartbeat": # All nodes but Node0 are subscribed to the topic for node in nodes[1 .. ^1]: node.subscribe(topic, voidTopicHandler) - await waitForHeartbeat() + await waitForHeartbeat(heartbeatInterval) # When Node0 sends a message to the topic tryPublish await node0.publish(topic, newSeq[byte](10000)), 1 # Then Node0 fanout peers are populated let maxFanoutPeers = node0.parameters.d - await waitForPeersInTable(node0, topic, maxFanoutPeers, PeerTableType.Fanout) + checkUntilCustomTimeout(timeout, interval): + node0.fanout[topic].len == maxFanoutPeers # When all peers but first one of Node0 fanout are disconnected let peersToDisconnect = node0.fanout[topic].toSeq()[1 .. ^1].mapIt(it.peerId) findAndUnsubscribePeers(nodes, peersToDisconnect, topic, voidTopicHandler) - await waitForPeersInTable(node0, topic, 1, PeerTableType.Fanout) # Then Node0 fanout peers are replenished during heartbeat - await waitForHeartbeat() - check: - # expecting 10[numberOfNodes] - 1[Node0] - (6[maxFanoutPeers] - 1[first peer not disconnected]) = 4 - node0.fanout[topic].len == numberOfNodes - 1 - (maxFanoutPeers - 1) + # expecting 10[numberOfNodes] - 1[Node0] - (6[maxFanoutPeers] - 1[first peer not disconnected]) = 4 + let expectedLen = numberOfNodes - 1 - (maxFanoutPeers - 1) + checkUntilCustomTimeout(timeout, interval): + node0.fanout[topic].len == expectedLen node0.fanout[topic].toSeq().allIt(it.peerId notin peersToDisconnect) asyncTest "iDontWants history - last element is pruned during heartbeat": - const topic = "foobar" + const + topic = "foobar" + historyLength = 5 + heartbeatInterval = 200.milliseconds let nodes = generateNodes( - 2, gossip = true, sendIDontWantOnPublish = true, historyLength = 5 + 2, + gossip = true, + sendIDontWantOnPublish = true, + historyLength = historyLength, + heartbeatInterval = heartbeatInterval, ) .toGossipSub() @@ -243,23 +244,27 @@ suite "GossipSub Heartbeat": await connectNodes(nodes[0], nodes[1]) subscribeAllNodes(nodes, topic, voidTopicHandler) - await waitForHeartbeat() + await waitForHeartbeat(heartbeatInterval) - # When Node0 sends 10 messages to the topic - const msgCount = 10 + # Get Node0 as Peer of Node1 + let peer = nodes[1].mesh[topic].toSeq()[0] + + # Wait for history to populate + checkUntilCustomTimeout(timeout, interval): + peer.iDontWants.len == historyLength + + # When Node0 sends 5 messages to the topic + const msgCount = 5 for i in 0 ..< msgCount: tryPublish await nodes[0].publish(topic, newSeq[byte](1000)), 1 - # Then Node1 receives 10 iDontWant messages from Node0 - let peer = nodes[1].mesh[topic].toSeq()[0] - check: + # Then Node1 receives 5 iDontWant messages from Node0 + checkUntilCustomTimeout(timeout, interval): peer.iDontWants[^1].len == msgCount # When heartbeat happens - await waitForHeartbeat() - # Then last element of iDontWants history is pruned - check: + checkUntilCustomTimeout(timeout, interval): peer.iDontWants[^1].len == 0 asyncTest "sentIHaves history - last element is pruned during heartbeat": @@ -268,12 +273,15 @@ suite "GossipSub Heartbeat": const numberOfNodes = 3 topic = "foobar" + heartbeatInterval = 200.milliseconds + historyLength = 3 let nodes = generateNodes( numberOfNodes, gossip = true, - historyLength = 3, + historyLength = historyLength, dValues = some(DValues(dLow: some(1), dHigh: some(1), d: some(1), dOut: some(0))), + heartbeatInterval = heartbeatInterval, ) .toGossipSub() @@ -282,32 +290,28 @@ suite "GossipSub Heartbeat": for i in 1 ..< numberOfNodes: await connectNodes(nodes[0], nodes[i]) subscribeAllNodes(nodes, topic, voidTopicHandler) - - # Waiting 2 heartbeats to populate sentIHaves history - await waitForHeartbeat(2) - - # When Node0 sends a messages to the topic - tryPublish await nodes[0].publish(topic, newSeq[byte](1000)), 1 + await waitForHeartbeat(heartbeatInterval) # Find Peer outside of mesh to which Node 0 will send IHave let peer = nodes[0].gossipsub[topic].toSeq().filterIt(it notin nodes[0].mesh[topic])[0] + # Wait for history to populate + checkUntilCustomTimeout(timeout, interval): + peer.sentIHaves.len == historyLength + + # When Node0 sends a messages to the topic + tryPublish await nodes[0].publish(topic, newSeq[byte](1000)), 1 + # When next heartbeat occurs - # Then IHave is sent and sentIHaves is populated - waitForCondition( - peer.sentIHaves[^1].len > 0, - 10.milliseconds, - 100.milliseconds, - "wait for sentIHaves timeout", - ) + # Then IHave is sent and sentIHaves is populated + checkUntilCustomTimeout(timeout, interval): + peer.sentIHaves[^1].len == 1 # Need to clear mCache as node would keep populating sentIHaves nodes[0].clearMCache() # When next heartbeat occurs - await waitForHeartbeat() - # Then last element of sentIHaves history is pruned - check: + checkUntilCustomTimeout(timeout, interval): peer.sentIHaves[^1].len == 0 diff --git a/tests/pubsub/testgossipsubmeshmanagement.nim b/tests/pubsub/testgossipsubmeshmanagement.nim index 07047e72c..e4175c1b0 100644 --- a/tests/pubsub/testgossipsubmeshmanagement.nim +++ b/tests/pubsub/testgossipsubmeshmanagement.nim @@ -9,7 +9,6 @@ {.used.} -import std/[sequtils] import stew/byteutils import utils import chronicles @@ -181,32 +180,26 @@ suite "GossipSub Mesh Management": let numberOfNodes = 5 topic = "foobar" - nodes = generateNodes(numberOfNodes, gossip = true) + nodes = generateNodes(numberOfNodes, gossip = true).toGossipSub() startNodesAndDeferStop(nodes) await connectNodesStar(nodes) subscribeAllNodes(nodes, topic, voidTopicHandler) let expectedNumberOfPeers = numberOfNodes - 1 - await waitForPeersInTable( - nodes, - topic, - newSeqWith(numberOfNodes, expectedNumberOfPeers), - PeerTableType.Gossipsub, - ) for i in 0 ..< numberOfNodes: - var gossip = GossipSub(nodes[i]) - check: - gossip.gossipsub[topic].len == expectedNumberOfPeers - gossip.mesh[topic].len == expectedNumberOfPeers - gossip.fanout.len == 0 + let node = nodes[i] + checkUntilCustomTimeout(500.milliseconds, 20.milliseconds): + node.gossipsub.getOrDefault(topic).len == expectedNumberOfPeers + node.mesh.getOrDefault(topic).len == expectedNumberOfPeers + node.fanout.len == 0 asyncTest "prune peers if mesh len is higher than d_high": let numberOfNodes = 15 topic = "foobar" - nodes = generateNodes(numberOfNodes, gossip = true) + nodes = generateNodes(numberOfNodes, gossip = true).toGossipSub() startNodesAndDeferStop(nodes) await connectNodesStar(nodes) @@ -218,20 +211,13 @@ suite "GossipSub Mesh Management": d = 6 dLow = 4 - await waitForPeersInTable( - nodes, - topic, - newSeqWith(numberOfNodes, expectedNumberOfPeers), - PeerTableType.Gossipsub, - ) - for i in 0 ..< numberOfNodes: - var gossip = GossipSub(nodes[i]) - - check: - gossip.gossipsub[topic].len == expectedNumberOfPeers - gossip.mesh[topic].len >= dLow and gossip.mesh[topic].len <= dHigh - gossip.fanout.len == 0 + let node = nodes[i] + checkUntilCustomTimeout(500.milliseconds, 20.milliseconds): + node.gossipsub.getOrDefault(topic).len == expectedNumberOfPeers + node.mesh.getOrDefault(topic).len >= dLow and + node.mesh.getOrDefault(topic).len <= dHigh + node.fanout.len == 0 asyncTest "GossipSub unsub - resub faster than backoff": # For this test to work we'd require a way to disable fanout. @@ -473,20 +459,25 @@ suite "GossipSub Mesh Management": await waitForHeartbeat() # Then all nodes should be subscribed to the topics initially - for node in nodes: - for topic in topics: - check node.topics.contains(topic) - check node.gossipsub[topic].len() == numberOfNodes - 1 - check node.mesh[topic].len() == numberOfNodes - 1 + for i in 0 ..< numberOfNodes: + let node = nodes[i] + for j in 0 ..< topics.len: + let topic = topics[j] + checkUntilCustomTimeout(500.milliseconds, 20.milliseconds): + node.topics.contains(topic) + node.gossipsub[topic].len() == numberOfNodes - 1 + node.mesh[topic].len() == numberOfNodes - 1 # When they unsubscribe from all topics for topic in topics: unsubscribeAllNodes(nodes, topic, voidTopicHandler) - await waitForHeartbeat() # Then topics should be removed from mesh and gossipsub - for node in nodes: - for topic in topics: - check topic notin node.topics - check topic notin node.mesh - check topic notin node.gossipsub + for i in 0 ..< numberOfNodes: + let node = nodes[i] + for j in 0 ..< topics.len: + let topic = topics[j] + checkUntilCustomTimeout(500.milliseconds, 20.milliseconds): + topic notin node.topics + topic notin node.mesh + topic notin node.gossipsub diff --git a/tests/pubsub/testgossipsubmessagehandling.nim b/tests/pubsub/testgossipsubmessagehandling.nim index 3e2e38849..e4e2cb5b0 100644 --- a/tests/pubsub/testgossipsubmessagehandling.nim +++ b/tests/pubsub/testgossipsubmessagehandling.nim @@ -464,9 +464,7 @@ suite "GossipSub Message Handling": # Send message that will be rejected by the receiver's validator tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1 - await waitForHeartbeat() - - check: + checkUntilCustomTimeout(500.milliseconds, 20.milliseconds): recvCounter == 2 validatedCounter == 1 sendCounter == 2 diff --git a/tests/pubsub/testpubsub.nim b/tests/pubsub/testpubsub.nim index 54ee36f88..33ee1cb17 100644 --- a/tests/pubsub/testpubsub.nim +++ b/tests/pubsub/testpubsub.nim @@ -1,6 +1,6 @@ {.used.} import - testgossipsubfanout, testgossipsubgossip, testgossipsubmeshmanagement, - testgossipsubmessagehandling, testgossipsubscoring, testfloodsub, testmcache, - testtimedcache, testmessage + testgossipsubcontrolmessages, testgossipsubfanout, testgossipsubgossip, + testgossipsubheartbeat, testgossipsubmeshmanagement, testgossipsubmessagehandling, + testgossipsubscoring, testfloodsub, testmcache, testtimedcache, testmessage diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index b33f875f7..37c2649c0 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -32,6 +32,9 @@ const HEARTBEAT_TIMEOUT* = # TEST_GOSSIPSUB_HEARTBEAT_INTERVAL + 20% proc waitForHeartbeat*(multiplier: int = 1) {.async.} = await sleepAsync(HEARTBEAT_TIMEOUT * multiplier) +proc waitForHeartbeat*(timeout: Duration) {.async.} = + await sleepAsync(timeout) + type TestGossipSub* = ref object of GossipSub DValues* = object @@ -263,19 +266,6 @@ proc connectNodesSparse*[T: PubSub](nodes: seq[T], degree: int = 2) {.async.} = if dialer.switch.peerInfo.peerId != node.switch.peerInfo.peerId: await connectNodes(dialer, node) -template waitForCondition*( - condition: untyped, - interval: Duration, - timeout: Duration, - timeoutErrorMessage = "waitForCondition timeout", -): untyped = - let maxTime = Moment.now() + timeout - - while not condition: - await sleepAsync(interval) - if Moment.now() >= maxTime: - raise (ref CatchableError)(msg: timeoutErrorMessage) - proc waitSub*(sender, receiver: auto, key: string) {.async.} = if sender == receiver: return @@ -284,14 +274,10 @@ proc waitSub*(sender, receiver: auto, key: string) {.async.} = # this is for testing purposes only # peers can be inside `mesh` and `fanout`, not just `gossipsub` - waitForCondition( + checkUntilCustomTimeout(5.seconds, 20.milliseconds): (fsub.gossipsub.hasKey(key) and fsub.gossipsub.hasPeerId(key, peerId)) or (fsub.mesh.hasKey(key) and fsub.mesh.hasPeerId(key, peerId)) or - (fsub.fanout.hasKey(key) and fsub.fanout.hasPeerId(key, peerId)), - 5.milliseconds, - 5.seconds, - "waitSub timeout!", - ) + (fsub.fanout.hasKey(key) and fsub.fanout.hasPeerId(key, peerId)) proc waitSubAllNodes*(nodes: seq[auto], topic: string) {.async.} = let numberOfNodes = nodes.len @@ -324,9 +310,8 @@ proc waitSubGraph*[T: PubSub](nodes: seq[T], key: string) {.async.} = return ok == nodes.len - waitForCondition( - isGraphConnected(), 10.milliseconds, 5.seconds, "waitSubGraph timeout!" - ) + checkUntilCustomTimeout(5.seconds, 20.milliseconds): + isGraphConnected() proc waitForMesh*( sender: auto, receiver: auto, key: string, timeoutDuration = 5.seconds @@ -338,12 +323,8 @@ proc waitForMesh*( gossipsubSender = GossipSub(sender) receiverPeerId = receiver.peerInfo.peerId - waitForCondition( - gossipsubSender.mesh.hasPeerId(key, receiverPeerId), - 5.milliseconds, - timeoutDuration, - "waitForMesh timeout!", - ) + checkUntilCustomTimeout(timeoutDuration, 20.milliseconds): + gossipsubSender.mesh.hasPeerId(key, receiverPeerId) type PeerTableType* {.pure.} = enum Gossipsub = "gossipsub" @@ -380,12 +361,8 @@ proc waitForPeersInTable*( satisfied[i] = currentCount >= peerCounts[i] return satisfied.allIt(it) - waitForCondition( - checkPeersCondition(), - 10.milliseconds, - timeout, - "Timeout waiting for peer counts in " & $table & " for topic " & topic, - ) + checkUntilCustomTimeout(timeout, 20.milliseconds): + checkPeersCondition() proc waitForPeersInTable*( node: auto, topic: string, peerCount: int, table: PeerTableType, timeout = 1.seconds diff --git a/tests/testhelpers.nim b/tests/testhelpers.nim index 144857298..76d15eb30 100644 --- a/tests/testhelpers.nim +++ b/tests/testhelpers.nim @@ -29,13 +29,13 @@ suite "Helpers": asyncTest "checkUntilCustomTimeout should pass when the condition is true": let a = 2 let b = 2 - checkUntilCustomTimeout(2.seconds): + checkUntilCustomTimeout(2.seconds, 100.milliseconds): a == b asyncTest "checkUntilCustomTimeout should pass when the conditions are true": let a = 2 let b = 2 - checkUntilCustomTimeout(5.seconds): + checkUntilCustomTimeout(5.seconds, 100.milliseconds): a == b a == 2 b == 2