mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-09 02:38:19 -05:00
test(gossipsub): include missing test files and handle flaky tests (#1416)
Co-authored-by: vladopajic <vladopajic@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
8c3a4d882a
commit
86695b55bb
@@ -94,7 +94,9 @@ proc new*(T: typedesc[TestBufferStream], writeHandler: WriteHandler): T =
|
||||
testBufferStream.initStream()
|
||||
testBufferStream
|
||||
|
||||
macro checkUntilCustomTimeout*(timeout: Duration, code: untyped): untyped =
|
||||
macro checkUntilCustomTimeout*(
|
||||
timeout: Duration, sleepInterval: Duration, code: untyped
|
||||
): untyped =
|
||||
## Periodically checks a given condition until it is true or a timeout occurs.
|
||||
##
|
||||
## `code`: untyped - A condition expression that should eventually evaluate to true.
|
||||
@@ -147,7 +149,7 @@ macro checkUntilCustomTimeout*(timeout: Duration, code: untyped): untyped =
|
||||
if `combinedBoolExpr`:
|
||||
return
|
||||
else:
|
||||
await sleepAsync(100.millis)
|
||||
await sleepAsync(`sleepInterval`)
|
||||
|
||||
await checkExpiringInternal()
|
||||
|
||||
@@ -173,7 +175,7 @@ macro checkUntilTimeout*(code: untyped): untyped =
|
||||
## b == 1
|
||||
## ```
|
||||
result = quote:
|
||||
checkUntilCustomTimeout(10.seconds, `code`)
|
||||
checkUntilCustomTimeout(10.seconds, 100.milliseconds, `code`)
|
||||
|
||||
proc unorderedCompare*[T](a, b: seq[T]): bool =
|
||||
if a == b:
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
{.used.}
|
||||
|
||||
import std/[sequtils]
|
||||
import stew/byteutils
|
||||
import utils
|
||||
import chronicles
|
||||
import ../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable]
|
||||
import ../helpers, ../utils/[futures]
|
||||
import ../helpers
|
||||
|
||||
suite "GossipSub Control Messages":
|
||||
teardown:
|
||||
|
||||
@@ -141,6 +141,7 @@ suite "GossipSub Gossip Protocol":
|
||||
topic = "foobar"
|
||||
dValues = DValues(dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1))
|
||||
nodes = generateNodes(numberOfNodes, gossip = true, dValues = some(dValues))
|
||||
.toGossipSub()
|
||||
|
||||
startNodesAndDeferStop(nodes)
|
||||
|
||||
@@ -152,19 +153,17 @@ suite "GossipSub Gossip Protocol":
|
||||
|
||||
# And subscribed to the same topic
|
||||
subscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
await waitForPeersInTable(
|
||||
nodes, topic, newSeqWith(numberOfNodes, 4), PeerTableType.Gossipsub
|
||||
)
|
||||
|
||||
checkUntilCustomTimeout(500.milliseconds, 20.milliseconds):
|
||||
nodes.allIt(it.gossipsub.getOrDefault(topic).len == numberOfNodes - 1)
|
||||
|
||||
# When node 0 sends a message
|
||||
check (await nodes[0].publish(topic, "Hello!".toBytes())) > 0
|
||||
await waitForHeartbeat()
|
||||
tryPublish await nodes[0].publish(topic, "Hello!".toBytes()), 1
|
||||
|
||||
# At least one of the nodes should have received an iHave message
|
||||
# The check is made this way because the mesh structure changes from run to run
|
||||
let receivedIHaves = messages[].mapIt(it[].len)
|
||||
check:
|
||||
anyIt(receivedIHaves, it > 0)
|
||||
checkUntilCustomTimeout(500.milliseconds, 20.milliseconds):
|
||||
messages[].mapIt(it[].len).anyIt(it > 0)
|
||||
|
||||
asyncTest "adaptive gossip dissemination, dLazy and gossipFactor to 0":
|
||||
let
|
||||
|
||||
@@ -9,34 +9,31 @@ suite "GossipSub Heartbeat":
|
||||
teardown:
|
||||
checkTrackers()
|
||||
|
||||
const
|
||||
timeout = 1.seconds
|
||||
interval = 50.milliseconds
|
||||
|
||||
asyncTest "Mesh is rebalanced during heartbeat - pruning peers":
|
||||
const
|
||||
numberOfNodes = 10
|
||||
topic = "foobar"
|
||||
heartbeatInterval = 200.milliseconds
|
||||
let
|
||||
nodes = generateNodes(numberOfNodes, gossip = true).toGossipSub()
|
||||
nodes = generateNodes(
|
||||
numberOfNodes, gossip = true, heartbeatInterval = heartbeatInterval
|
||||
)
|
||||
.toGossipSub()
|
||||
node0 = nodes[0]
|
||||
|
||||
startNodesAndDeferStop(nodes)
|
||||
|
||||
# Observe timestamps of received prune messages
|
||||
var lastPrune = Moment.now()
|
||||
for i in 0 ..< numberOfNodes:
|
||||
let pubsubObserver = PubSubObserver(
|
||||
onRecv: proc(peer: PubSubPeer, msgs: var RPCMsg) =
|
||||
if msgs.control.isSome:
|
||||
for msg in msgs.control.get.prune:
|
||||
lastPrune = Moment.now()
|
||||
)
|
||||
nodes[i].addObserver(pubsubObserver)
|
||||
|
||||
# Nodes are connected to Node0
|
||||
for i in 1 ..< numberOfNodes:
|
||||
await connectNodes(node0, nodes[i])
|
||||
subscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
check:
|
||||
node0.mesh[topic].len == numberOfNodes - 1
|
||||
|
||||
checkUntilCustomTimeout(timeout, interval):
|
||||
node0.mesh.getOrDefault(topic).len == numberOfNodes - 1
|
||||
|
||||
# When DValues of Node0 are updated to lower than defaults
|
||||
const
|
||||
@@ -54,21 +51,9 @@ suite "GossipSub Heartbeat":
|
||||
)
|
||||
node0.parameters.applyDValues(newDValues)
|
||||
|
||||
# Waiting 2 hearbeats to finish pruning (2 peers first and then 3)
|
||||
# Comparing timestamps of last received prune to confirm heartbeat interval
|
||||
await waitForHeartbeat()
|
||||
let after1stHearbeat = lastPrune
|
||||
|
||||
await waitForHeartbeat()
|
||||
let after2ndHearbeat = lastPrune
|
||||
|
||||
let measuredHeartbeat = after2ndHearbeat - after1stHearbeat
|
||||
let heartbeatDiff = measuredHeartbeat - TEST_GOSSIPSUB_HEARTBEAT_INTERVAL
|
||||
|
||||
# Then mesh of Node0 is rebalanced and peers are pruned to adapt to new values
|
||||
check:
|
||||
checkUntilCustomTimeout(timeout, interval):
|
||||
node0.mesh[topic].len >= newDLow and node0.mesh[topic].len <= newDHigh
|
||||
heartbeatDiff < 2.milliseconds # 2ms margin
|
||||
|
||||
asyncTest "Mesh is rebalanced during heartbeat - grafting new peers":
|
||||
const
|
||||
@@ -76,6 +61,7 @@ suite "GossipSub Heartbeat":
|
||||
topic = "foobar"
|
||||
dLow = 3
|
||||
dHigh = 4
|
||||
heartbeatInterval = 200.milliseconds
|
||||
let
|
||||
nodes = generateNodes(
|
||||
numberOfNodes,
|
||||
@@ -84,6 +70,7 @@ suite "GossipSub Heartbeat":
|
||||
DValues(dLow: some(dLow), dHigh: some(dHigh), d: some(3), dOut: some(1))
|
||||
),
|
||||
pruneBackoff = 20.milliseconds,
|
||||
heartbeatInterval = heartbeatInterval,
|
||||
)
|
||||
.toGossipSub()
|
||||
node0 = nodes[0]
|
||||
@@ -94,19 +81,16 @@ suite "GossipSub Heartbeat":
|
||||
for i in 1 ..< numberOfNodes:
|
||||
await connectNodes(node0, nodes[i])
|
||||
subscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
|
||||
check:
|
||||
node0.mesh[topic].len >= dLow and node0.mesh[topic].len <= dHigh
|
||||
checkUntilCustomTimeout(timeout, interval):
|
||||
node0.mesh.getOrDefault(topic).len >= dLow and
|
||||
node0.mesh.getOrDefault(topic).len <= dHigh
|
||||
|
||||
# When peers of Node0 mesh are disconnected
|
||||
let peersToDisconnect = node0.mesh[topic].toSeq()[1 .. ^1].mapIt(it.peerId)
|
||||
findAndUnsubscribePeers(nodes, peersToDisconnect, topic, voidTopicHandler)
|
||||
|
||||
# Then mesh of Node0 is rebalanced and new peers are added
|
||||
await waitForHeartbeat()
|
||||
|
||||
check:
|
||||
checkUntilCustomTimeout(timeout, interval):
|
||||
node0.mesh[topic].len >= dLow and node0.mesh[topic].len <= dHigh
|
||||
node0.mesh[topic].toSeq().allIt(it.peerId notin peersToDisconnect)
|
||||
|
||||
@@ -114,6 +98,7 @@ suite "GossipSub Heartbeat":
|
||||
const
|
||||
numberOfNodes = 10
|
||||
topic = "foobar"
|
||||
heartbeatInterval = 200.milliseconds
|
||||
let
|
||||
nodes = generateNodes(
|
||||
numberOfNodes,
|
||||
@@ -130,6 +115,7 @@ suite "GossipSub Heartbeat":
|
||||
),
|
||||
pruneBackoff = 20.milliseconds,
|
||||
opportunisticGraftThreshold = 600,
|
||||
heartbeatInterval = heartbeatInterval,
|
||||
)
|
||||
.toGossipSub()
|
||||
node0 = nodes[0]
|
||||
@@ -140,7 +126,7 @@ suite "GossipSub Heartbeat":
|
||||
for i in 1 ..< numberOfNodes:
|
||||
await connectNodes(node0, nodes[i])
|
||||
subscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
await waitForHeartbeat(heartbeatInterval)
|
||||
|
||||
# Keep track of initial mesh of Node0
|
||||
let startingMesh = node0.mesh[topic].toSeq()
|
||||
@@ -159,7 +145,7 @@ suite "GossipSub Heartbeat":
|
||||
expectedGrafts &= peer
|
||||
|
||||
# Then during heartbeat Peers with lower than median scores are pruned and max 2 Peers are grafted
|
||||
await waitForHeartbeat()
|
||||
await waitForHeartbeat(heartbeatInterval)
|
||||
|
||||
let actualGrafts = node0.mesh[topic].toSeq().filterIt(it notin startingMesh)
|
||||
const maxOpportunisticGraftsPerHeartbeat = 2
|
||||
@@ -171,7 +157,13 @@ suite "GossipSub Heartbeat":
|
||||
const
|
||||
numberOfNodes = 10
|
||||
topic = "foobar"
|
||||
let nodes = generateNodes(numberOfNodes, gossip = true, fanoutTTL = 30.milliseconds)
|
||||
heartbeatInterval = 200.milliseconds
|
||||
let nodes = generateNodes(
|
||||
numberOfNodes,
|
||||
gossip = true,
|
||||
fanoutTTL = 60.milliseconds,
|
||||
heartbeatInterval = heartbeatInterval,
|
||||
)
|
||||
.toGossipSub()
|
||||
|
||||
startNodesAndDeferStop(nodes)
|
||||
@@ -180,29 +172,31 @@ suite "GossipSub Heartbeat":
|
||||
# All nodes but Node0 are subscribed to the topic
|
||||
for node in nodes[1 .. ^1]:
|
||||
node.subscribe(topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
await waitForHeartbeat(heartbeatInterval)
|
||||
|
||||
# When Node0 sends a message to the topic
|
||||
let node0 = nodes[0]
|
||||
tryPublish await node0.publish(topic, newSeq[byte](10000)), 1
|
||||
tryPublish await node0.publish(topic, newSeq[byte](10000)), 3
|
||||
|
||||
# Then Node0 fanout peers are populated
|
||||
let maxFanoutPeers = node0.parameters.d
|
||||
await waitForPeersInTable(node0, topic, maxFanoutPeers, PeerTableType.Fanout)
|
||||
check:
|
||||
checkUntilCustomTimeout(timeout, interval):
|
||||
node0.fanout.hasKey(topic) and node0.fanout[topic].len == maxFanoutPeers
|
||||
|
||||
# And after heartbeat (60ms) Node0 fanout peers are dropped (because fanoutTTL=30ms)
|
||||
await waitForHeartbeat()
|
||||
check:
|
||||
# And after heartbeat Node0 fanout peers are dropped (because fanoutTTL < heartbeatInterval)
|
||||
checkUntilCustomTimeout(timeout, interval):
|
||||
not node0.fanout.hasKey(topic)
|
||||
|
||||
asyncTest "Fanout maintenance during heartbeat - fanout peers are replenished":
|
||||
const
|
||||
numberOfNodes = 10
|
||||
topic = "foobar"
|
||||
heartbeatInterval = 200.milliseconds
|
||||
let
|
||||
nodes = generateNodes(numberOfNodes, gossip = true).toGossipSub()
|
||||
nodes = generateNodes(
|
||||
numberOfNodes, gossip = true, heartbeatInterval = heartbeatInterval
|
||||
)
|
||||
.toGossipSub()
|
||||
node0 = nodes[0]
|
||||
|
||||
startNodesAndDeferStop(nodes)
|
||||
@@ -211,31 +205,38 @@ suite "GossipSub Heartbeat":
|
||||
# All nodes but Node0 are subscribed to the topic
|
||||
for node in nodes[1 .. ^1]:
|
||||
node.subscribe(topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
await waitForHeartbeat(heartbeatInterval)
|
||||
|
||||
# When Node0 sends a message to the topic
|
||||
tryPublish await node0.publish(topic, newSeq[byte](10000)), 1
|
||||
|
||||
# Then Node0 fanout peers are populated
|
||||
let maxFanoutPeers = node0.parameters.d
|
||||
await waitForPeersInTable(node0, topic, maxFanoutPeers, PeerTableType.Fanout)
|
||||
checkUntilCustomTimeout(timeout, interval):
|
||||
node0.fanout[topic].len == maxFanoutPeers
|
||||
|
||||
# When all peers but first one of Node0 fanout are disconnected
|
||||
let peersToDisconnect = node0.fanout[topic].toSeq()[1 .. ^1].mapIt(it.peerId)
|
||||
findAndUnsubscribePeers(nodes, peersToDisconnect, topic, voidTopicHandler)
|
||||
await waitForPeersInTable(node0, topic, 1, PeerTableType.Fanout)
|
||||
|
||||
# Then Node0 fanout peers are replenished during heartbeat
|
||||
await waitForHeartbeat()
|
||||
check:
|
||||
# expecting 10[numberOfNodes] - 1[Node0] - (6[maxFanoutPeers] - 1[first peer not disconnected]) = 4
|
||||
node0.fanout[topic].len == numberOfNodes - 1 - (maxFanoutPeers - 1)
|
||||
# expecting 10[numberOfNodes] - 1[Node0] - (6[maxFanoutPeers] - 1[first peer not disconnected]) = 4
|
||||
let expectedLen = numberOfNodes - 1 - (maxFanoutPeers - 1)
|
||||
checkUntilCustomTimeout(timeout, interval):
|
||||
node0.fanout[topic].len == expectedLen
|
||||
node0.fanout[topic].toSeq().allIt(it.peerId notin peersToDisconnect)
|
||||
|
||||
asyncTest "iDontWants history - last element is pruned during heartbeat":
|
||||
const topic = "foobar"
|
||||
const
|
||||
topic = "foobar"
|
||||
historyLength = 5
|
||||
heartbeatInterval = 200.milliseconds
|
||||
let nodes = generateNodes(
|
||||
2, gossip = true, sendIDontWantOnPublish = true, historyLength = 5
|
||||
2,
|
||||
gossip = true,
|
||||
sendIDontWantOnPublish = true,
|
||||
historyLength = historyLength,
|
||||
heartbeatInterval = heartbeatInterval,
|
||||
)
|
||||
.toGossipSub()
|
||||
|
||||
@@ -243,23 +244,27 @@ suite "GossipSub Heartbeat":
|
||||
|
||||
await connectNodes(nodes[0], nodes[1])
|
||||
subscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
await waitForHeartbeat(heartbeatInterval)
|
||||
|
||||
# When Node0 sends 10 messages to the topic
|
||||
const msgCount = 10
|
||||
# Get Node0 as Peer of Node1
|
||||
let peer = nodes[1].mesh[topic].toSeq()[0]
|
||||
|
||||
# Wait for history to populate
|
||||
checkUntilCustomTimeout(timeout, interval):
|
||||
peer.iDontWants.len == historyLength
|
||||
|
||||
# When Node0 sends 5 messages to the topic
|
||||
const msgCount = 5
|
||||
for i in 0 ..< msgCount:
|
||||
tryPublish await nodes[0].publish(topic, newSeq[byte](1000)), 1
|
||||
|
||||
# Then Node1 receives 10 iDontWant messages from Node0
|
||||
let peer = nodes[1].mesh[topic].toSeq()[0]
|
||||
check:
|
||||
# Then Node1 receives 5 iDontWant messages from Node0
|
||||
checkUntilCustomTimeout(timeout, interval):
|
||||
peer.iDontWants[^1].len == msgCount
|
||||
|
||||
# When heartbeat happens
|
||||
await waitForHeartbeat()
|
||||
|
||||
# Then last element of iDontWants history is pruned
|
||||
check:
|
||||
checkUntilCustomTimeout(timeout, interval):
|
||||
peer.iDontWants[^1].len == 0
|
||||
|
||||
asyncTest "sentIHaves history - last element is pruned during heartbeat":
|
||||
@@ -268,12 +273,15 @@ suite "GossipSub Heartbeat":
|
||||
const
|
||||
numberOfNodes = 3
|
||||
topic = "foobar"
|
||||
heartbeatInterval = 200.milliseconds
|
||||
historyLength = 3
|
||||
let nodes = generateNodes(
|
||||
numberOfNodes,
|
||||
gossip = true,
|
||||
historyLength = 3,
|
||||
historyLength = historyLength,
|
||||
dValues =
|
||||
some(DValues(dLow: some(1), dHigh: some(1), d: some(1), dOut: some(0))),
|
||||
heartbeatInterval = heartbeatInterval,
|
||||
)
|
||||
.toGossipSub()
|
||||
|
||||
@@ -282,32 +290,28 @@ suite "GossipSub Heartbeat":
|
||||
for i in 1 ..< numberOfNodes:
|
||||
await connectNodes(nodes[0], nodes[i])
|
||||
subscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
|
||||
# Waiting 2 heartbeats to populate sentIHaves history
|
||||
await waitForHeartbeat(2)
|
||||
|
||||
# When Node0 sends a messages to the topic
|
||||
tryPublish await nodes[0].publish(topic, newSeq[byte](1000)), 1
|
||||
await waitForHeartbeat(heartbeatInterval)
|
||||
|
||||
# Find Peer outside of mesh to which Node 0 will send IHave
|
||||
let peer =
|
||||
nodes[0].gossipsub[topic].toSeq().filterIt(it notin nodes[0].mesh[topic])[0]
|
||||
|
||||
# Wait for history to populate
|
||||
checkUntilCustomTimeout(timeout, interval):
|
||||
peer.sentIHaves.len == historyLength
|
||||
|
||||
# When Node0 sends a messages to the topic
|
||||
tryPublish await nodes[0].publish(topic, newSeq[byte](1000)), 1
|
||||
|
||||
# When next heartbeat occurs
|
||||
# Then IHave is sent and sentIHaves is populated
|
||||
waitForCondition(
|
||||
peer.sentIHaves[^1].len > 0,
|
||||
10.milliseconds,
|
||||
100.milliseconds,
|
||||
"wait for sentIHaves timeout",
|
||||
)
|
||||
# Then IHave is sent and sentIHaves is populated
|
||||
checkUntilCustomTimeout(timeout, interval):
|
||||
peer.sentIHaves[^1].len == 1
|
||||
|
||||
# Need to clear mCache as node would keep populating sentIHaves
|
||||
nodes[0].clearMCache()
|
||||
|
||||
# When next heartbeat occurs
|
||||
await waitForHeartbeat()
|
||||
|
||||
# Then last element of sentIHaves history is pruned
|
||||
check:
|
||||
checkUntilCustomTimeout(timeout, interval):
|
||||
peer.sentIHaves[^1].len == 0
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
|
||||
{.used.}
|
||||
|
||||
import std/[sequtils]
|
||||
import stew/byteutils
|
||||
import utils
|
||||
import chronicles
|
||||
@@ -181,32 +180,26 @@ suite "GossipSub Mesh Management":
|
||||
let
|
||||
numberOfNodes = 5
|
||||
topic = "foobar"
|
||||
nodes = generateNodes(numberOfNodes, gossip = true)
|
||||
nodes = generateNodes(numberOfNodes, gossip = true).toGossipSub()
|
||||
|
||||
startNodesAndDeferStop(nodes)
|
||||
await connectNodesStar(nodes)
|
||||
subscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
|
||||
let expectedNumberOfPeers = numberOfNodes - 1
|
||||
await waitForPeersInTable(
|
||||
nodes,
|
||||
topic,
|
||||
newSeqWith(numberOfNodes, expectedNumberOfPeers),
|
||||
PeerTableType.Gossipsub,
|
||||
)
|
||||
|
||||
for i in 0 ..< numberOfNodes:
|
||||
var gossip = GossipSub(nodes[i])
|
||||
check:
|
||||
gossip.gossipsub[topic].len == expectedNumberOfPeers
|
||||
gossip.mesh[topic].len == expectedNumberOfPeers
|
||||
gossip.fanout.len == 0
|
||||
let node = nodes[i]
|
||||
checkUntilCustomTimeout(500.milliseconds, 20.milliseconds):
|
||||
node.gossipsub.getOrDefault(topic).len == expectedNumberOfPeers
|
||||
node.mesh.getOrDefault(topic).len == expectedNumberOfPeers
|
||||
node.fanout.len == 0
|
||||
|
||||
asyncTest "prune peers if mesh len is higher than d_high":
|
||||
let
|
||||
numberOfNodes = 15
|
||||
topic = "foobar"
|
||||
nodes = generateNodes(numberOfNodes, gossip = true)
|
||||
nodes = generateNodes(numberOfNodes, gossip = true).toGossipSub()
|
||||
|
||||
startNodesAndDeferStop(nodes)
|
||||
await connectNodesStar(nodes)
|
||||
@@ -218,20 +211,13 @@ suite "GossipSub Mesh Management":
|
||||
d = 6
|
||||
dLow = 4
|
||||
|
||||
await waitForPeersInTable(
|
||||
nodes,
|
||||
topic,
|
||||
newSeqWith(numberOfNodes, expectedNumberOfPeers),
|
||||
PeerTableType.Gossipsub,
|
||||
)
|
||||
|
||||
for i in 0 ..< numberOfNodes:
|
||||
var gossip = GossipSub(nodes[i])
|
||||
|
||||
check:
|
||||
gossip.gossipsub[topic].len == expectedNumberOfPeers
|
||||
gossip.mesh[topic].len >= dLow and gossip.mesh[topic].len <= dHigh
|
||||
gossip.fanout.len == 0
|
||||
let node = nodes[i]
|
||||
checkUntilCustomTimeout(500.milliseconds, 20.milliseconds):
|
||||
node.gossipsub.getOrDefault(topic).len == expectedNumberOfPeers
|
||||
node.mesh.getOrDefault(topic).len >= dLow and
|
||||
node.mesh.getOrDefault(topic).len <= dHigh
|
||||
node.fanout.len == 0
|
||||
|
||||
asyncTest "GossipSub unsub - resub faster than backoff":
|
||||
# For this test to work we'd require a way to disable fanout.
|
||||
@@ -473,20 +459,25 @@ suite "GossipSub Mesh Management":
|
||||
await waitForHeartbeat()
|
||||
|
||||
# Then all nodes should be subscribed to the topics initially
|
||||
for node in nodes:
|
||||
for topic in topics:
|
||||
check node.topics.contains(topic)
|
||||
check node.gossipsub[topic].len() == numberOfNodes - 1
|
||||
check node.mesh[topic].len() == numberOfNodes - 1
|
||||
for i in 0 ..< numberOfNodes:
|
||||
let node = nodes[i]
|
||||
for j in 0 ..< topics.len:
|
||||
let topic = topics[j]
|
||||
checkUntilCustomTimeout(500.milliseconds, 20.milliseconds):
|
||||
node.topics.contains(topic)
|
||||
node.gossipsub[topic].len() == numberOfNodes - 1
|
||||
node.mesh[topic].len() == numberOfNodes - 1
|
||||
|
||||
# When they unsubscribe from all topics
|
||||
for topic in topics:
|
||||
unsubscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
|
||||
# Then topics should be removed from mesh and gossipsub
|
||||
for node in nodes:
|
||||
for topic in topics:
|
||||
check topic notin node.topics
|
||||
check topic notin node.mesh
|
||||
check topic notin node.gossipsub
|
||||
for i in 0 ..< numberOfNodes:
|
||||
let node = nodes[i]
|
||||
for j in 0 ..< topics.len:
|
||||
let topic = topics[j]
|
||||
checkUntilCustomTimeout(500.milliseconds, 20.milliseconds):
|
||||
topic notin node.topics
|
||||
topic notin node.mesh
|
||||
topic notin node.gossipsub
|
||||
|
||||
@@ -464,9 +464,7 @@ suite "GossipSub Message Handling":
|
||||
# Send message that will be rejected by the receiver's validator
|
||||
tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1
|
||||
|
||||
await waitForHeartbeat()
|
||||
|
||||
check:
|
||||
checkUntilCustomTimeout(500.milliseconds, 20.milliseconds):
|
||||
recvCounter == 2
|
||||
validatedCounter == 1
|
||||
sendCounter == 2
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
testgossipsubfanout, testgossipsubgossip, testgossipsubmeshmanagement,
|
||||
testgossipsubmessagehandling, testgossipsubscoring, testfloodsub, testmcache,
|
||||
testtimedcache, testmessage
|
||||
testgossipsubcontrolmessages, testgossipsubfanout, testgossipsubgossip,
|
||||
testgossipsubheartbeat, testgossipsubmeshmanagement, testgossipsubmessagehandling,
|
||||
testgossipsubscoring, testfloodsub, testmcache, testtimedcache, testmessage
|
||||
|
||||
@@ -32,6 +32,9 @@ const HEARTBEAT_TIMEOUT* = # TEST_GOSSIPSUB_HEARTBEAT_INTERVAL + 20%
|
||||
proc waitForHeartbeat*(multiplier: int = 1) {.async.} =
|
||||
await sleepAsync(HEARTBEAT_TIMEOUT * multiplier)
|
||||
|
||||
proc waitForHeartbeat*(timeout: Duration) {.async.} =
|
||||
await sleepAsync(timeout)
|
||||
|
||||
type
|
||||
TestGossipSub* = ref object of GossipSub
|
||||
DValues* = object
|
||||
@@ -263,19 +266,6 @@ proc connectNodesSparse*[T: PubSub](nodes: seq[T], degree: int = 2) {.async.} =
|
||||
if dialer.switch.peerInfo.peerId != node.switch.peerInfo.peerId:
|
||||
await connectNodes(dialer, node)
|
||||
|
||||
template waitForCondition*(
|
||||
condition: untyped,
|
||||
interval: Duration,
|
||||
timeout: Duration,
|
||||
timeoutErrorMessage = "waitForCondition timeout",
|
||||
): untyped =
|
||||
let maxTime = Moment.now() + timeout
|
||||
|
||||
while not condition:
|
||||
await sleepAsync(interval)
|
||||
if Moment.now() >= maxTime:
|
||||
raise (ref CatchableError)(msg: timeoutErrorMessage)
|
||||
|
||||
proc waitSub*(sender, receiver: auto, key: string) {.async.} =
|
||||
if sender == receiver:
|
||||
return
|
||||
@@ -284,14 +274,10 @@ proc waitSub*(sender, receiver: auto, key: string) {.async.} =
|
||||
|
||||
# this is for testing purposes only
|
||||
# peers can be inside `mesh` and `fanout`, not just `gossipsub`
|
||||
waitForCondition(
|
||||
checkUntilCustomTimeout(5.seconds, 20.milliseconds):
|
||||
(fsub.gossipsub.hasKey(key) and fsub.gossipsub.hasPeerId(key, peerId)) or
|
||||
(fsub.mesh.hasKey(key) and fsub.mesh.hasPeerId(key, peerId)) or
|
||||
(fsub.fanout.hasKey(key) and fsub.fanout.hasPeerId(key, peerId)),
|
||||
5.milliseconds,
|
||||
5.seconds,
|
||||
"waitSub timeout!",
|
||||
)
|
||||
(fsub.fanout.hasKey(key) and fsub.fanout.hasPeerId(key, peerId))
|
||||
|
||||
proc waitSubAllNodes*(nodes: seq[auto], topic: string) {.async.} =
|
||||
let numberOfNodes = nodes.len
|
||||
@@ -324,9 +310,8 @@ proc waitSubGraph*[T: PubSub](nodes: seq[T], key: string) {.async.} =
|
||||
|
||||
return ok == nodes.len
|
||||
|
||||
waitForCondition(
|
||||
isGraphConnected(), 10.milliseconds, 5.seconds, "waitSubGraph timeout!"
|
||||
)
|
||||
checkUntilCustomTimeout(5.seconds, 20.milliseconds):
|
||||
isGraphConnected()
|
||||
|
||||
proc waitForMesh*(
|
||||
sender: auto, receiver: auto, key: string, timeoutDuration = 5.seconds
|
||||
@@ -338,12 +323,8 @@ proc waitForMesh*(
|
||||
gossipsubSender = GossipSub(sender)
|
||||
receiverPeerId = receiver.peerInfo.peerId
|
||||
|
||||
waitForCondition(
|
||||
gossipsubSender.mesh.hasPeerId(key, receiverPeerId),
|
||||
5.milliseconds,
|
||||
timeoutDuration,
|
||||
"waitForMesh timeout!",
|
||||
)
|
||||
checkUntilCustomTimeout(timeoutDuration, 20.milliseconds):
|
||||
gossipsubSender.mesh.hasPeerId(key, receiverPeerId)
|
||||
|
||||
type PeerTableType* {.pure.} = enum
|
||||
Gossipsub = "gossipsub"
|
||||
@@ -380,12 +361,8 @@ proc waitForPeersInTable*(
|
||||
satisfied[i] = currentCount >= peerCounts[i]
|
||||
return satisfied.allIt(it)
|
||||
|
||||
waitForCondition(
|
||||
checkPeersCondition(),
|
||||
10.milliseconds,
|
||||
timeout,
|
||||
"Timeout waiting for peer counts in " & $table & " for topic " & topic,
|
||||
)
|
||||
checkUntilCustomTimeout(timeout, 20.milliseconds):
|
||||
checkPeersCondition()
|
||||
|
||||
proc waitForPeersInTable*(
|
||||
node: auto, topic: string, peerCount: int, table: PeerTableType, timeout = 1.seconds
|
||||
|
||||
@@ -29,13 +29,13 @@ suite "Helpers":
|
||||
asyncTest "checkUntilCustomTimeout should pass when the condition is true":
|
||||
let a = 2
|
||||
let b = 2
|
||||
checkUntilCustomTimeout(2.seconds):
|
||||
checkUntilCustomTimeout(2.seconds, 100.milliseconds):
|
||||
a == b
|
||||
|
||||
asyncTest "checkUntilCustomTimeout should pass when the conditions are true":
|
||||
let a = 2
|
||||
let b = 2
|
||||
checkUntilCustomTimeout(5.seconds):
|
||||
checkUntilCustomTimeout(5.seconds, 100.milliseconds):
|
||||
a == b
|
||||
a == 2
|
||||
b == 2
|
||||
|
||||
Reference in New Issue
Block a user