From c1ef01155689133788a8c9d4bbc1975ae1938360 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Kami=C5=84ski?= Date: Wed, 14 May 2025 17:15:31 +0100 Subject: [PATCH] test(gossipsub): refactor testgossipinternal (#1366) --- tests/pubsub/testgossipsubfanout.nim | 99 +++------ tests/pubsub/testgossipsubgossip.nim | 187 +++++------------ tests/pubsub/testgossipsubmeshmanagement.nim | 188 +++++------------- tests/pubsub/testgossipsubmessagehandling.nim | 28 +-- tests/pubsub/testgossipsubscoring.nim | 25 +-- tests/pubsub/utils.nim | 77 ++++++- 6 files changed, 197 insertions(+), 407 deletions(-) diff --git a/tests/pubsub/testgossipsubfanout.nim b/tests/pubsub/testgossipsubfanout.nim index 0823b3f9a..d0d706ac6 100644 --- a/tests/pubsub/testgossipsubfanout.nim +++ b/tests/pubsub/testgossipsubfanout.nim @@ -22,88 +22,44 @@ suite "GossipSub Fanout Management": checkTrackers() asyncTest "`replenishFanout` Degree Lo": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - - proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = - discard - let topic = "foobar" - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - gossipSub.topicParams[topic] = TopicParams.init() - - var conns = newSeq[Connection]() - for i in 0 ..< 15: - let conn = TestBufferStream.new(noop) - conns &= conn - var peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.handler = handler - gossipSub.gossipsub[topic].incl(peer) + let (gossipSub, conns, peers) = + setupGossipSubWithPeers(15, topic, populateGossipsub = true) + defer: + await teardownGossipSub(gossipSub, conns) check gossipSub.gossipsub[topic].len == 15 gossipSub.replenishFanout(topic) check gossipSub.fanout[topic].len == gossipSub.parameters.d - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - asyncTest "`dropFanoutPeers` drop expired fanout topics": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - - proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = - discard - let topic = "foobar" - gossipSub.topicParams[topic] = TopicParams.init() - gossipSub.fanout[topic] = initHashSet[PubSubPeer]() - gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(1.millis) - await sleepAsync(5.millis) # allow the topic to expire + let (gossipSub, conns, peers) = + setupGossipSubWithPeers(6, topic, populateGossipsub = true, populateFanout = true) + defer: + await teardownGossipSub(gossipSub, conns) - var conns = newSeq[Connection]() - for i in 0 ..< 6: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.handler = handler - gossipSub.fanout[topic].incl(peer) + gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(1.millis) + await sleepAsync(5.millis) # allow the topic to expire check gossipSub.fanout[topic].len == gossipSub.parameters.d gossipSub.dropFanoutPeers() check topic notin gossipSub.fanout - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - asyncTest "`dropFanoutPeers` leave unexpired fanout topics": - let gossipSub = TestGossipSub.init(newStandardSwitch()) + let + topic1 = "foobar1" + topic2 = "foobar2" + let (gossipSub, conns, peers) = setupGossipSubWithPeers( + 6, @[topic1, topic2], populateGossipsub = true, populateFanout = true + ) + defer: + await teardownGossipSub(gossipSub, conns) - proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = - discard - - let topic1 = "foobar1" - let topic2 = "foobar2" - gossipSub.topicParams[topic1] = TopicParams.init() - gossipSub.topicParams[topic2] = TopicParams.init() - gossipSub.fanout[topic1] = initHashSet[PubSubPeer]() - gossipSub.fanout[topic2] = initHashSet[PubSubPeer]() gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(1.millis) gossipSub.lastFanoutPubSub[topic2] = Moment.fromNow(1.minutes) - await sleepAsync(5.millis) # allow the topic to expire - - var conns = newSeq[Connection]() - for i in 0 ..< 6: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.handler = handler - gossipSub.fanout[topic1].incl(peer) - gossipSub.fanout[topic2].incl(peer) + await sleepAsync(5.millis) # allow first topic to expire check gossipSub.fanout[topic1].len == gossipSub.parameters.d check gossipSub.fanout[topic2].len == gossipSub.parameters.d @@ -112,14 +68,8 @@ suite "GossipSub Fanout Management": check topic1 notin gossipSub.fanout check topic2 in gossipSub.fanout - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - asyncTest "e2e - GossipSub send over fanout A -> B": - var passed = newFuture[void]() - proc handler(topic: string, data: seq[byte]) {.async.} = - check topic == "foobar" - passed.complete() + let (passed, handler) = createCompleteHandler() let nodes = generateNodes(2, gossip = true) @@ -153,15 +103,12 @@ suite "GossipSub Fanout Management": gossip1.fanout.hasPeerId("foobar", gossip2.peerInfo.peerId) not gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId) - await passed.wait(2.seconds) + discard await passed.wait(2.seconds) check observed == 2 asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic": - var passed = newFuture[void]() - proc handler(topic: string, data: seq[byte]) {.async.} = - check topic == "foobar" - passed.complete() + let (passed, handler) = createCompleteHandler() let nodes = generateNodes(2, gossip = true, unsubscribeBackoff = 10.minutes) @@ -191,6 +138,6 @@ suite "GossipSub Fanout Management": GossipSub(nodes[0]).fanout.getOrDefault("foobar").len > 0 GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0 - await passed.wait(2.seconds) + discard await passed.wait(2.seconds) trace "test done, stopping..." diff --git a/tests/pubsub/testgossipsubgossip.nim b/tests/pubsub/testgossipsubgossip.nim index 5f8bd1aa0..e396d7056 100644 --- a/tests/pubsub/testgossipsubgossip.nim +++ b/tests/pubsub/testgossipsubgossip.nim @@ -24,26 +24,14 @@ suite "GossipSub Gossip Protocol": checkTrackers() asyncTest "`getGossipPeers` - should gather up to degree D non intersecting peers": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - - proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = - discard - let topic = "foobar" - gossipSub.topicParams[topic] = TopicParams.init() - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.fanout[topic] = initHashSet[PubSubPeer]() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - var conns = newSeq[Connection]() + let (gossipSub, conns, peers) = setupGossipSubWithPeers(45, topic) + defer: + await teardownGossipSub(gossipSub, conns) - # generate mesh and fanout peers + # generate mesh and fanout peers for i in 0 ..< 30: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.handler = handler + let peer = peers[i] if i mod 2 == 0: gossipSub.fanout[topic].incl(peer) else: @@ -51,57 +39,36 @@ suite "GossipSub Gossip Protocol": gossipSub.mesh[topic].incl(peer) # generate gossipsub (free standing) peers - for i in 0 ..< 15: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.handler = handler + for i in 30 ..< 45: + let peer = peers[i] gossipSub.gossipsub[topic].incl(peer) # generate messages var seqno = 0'u64 for i in 0 .. 5: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId + let conn = conns[i] inc seqno - let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) + let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg) check gossipSub.fanout[topic].len == 15 check gossipSub.mesh[topic].len == 15 check gossipSub.gossipsub[topic].len == 15 - let peers = gossipSub.getGossipPeers() - check peers.len == gossipSub.parameters.d - for p in peers.keys: + let gossipPeers = gossipSub.getGossipPeers() + check gossipPeers.len == gossipSub.parameters.d + for p in gossipPeers.keys: check not gossipSub.fanout.hasPeerId(topic, p.peerId) check not gossipSub.mesh.hasPeerId(topic, p.peerId) - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - asyncTest "`getGossipPeers` - should not crash on missing topics in mesh": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - - proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = - discard - let topic = "foobar" - gossipSub.topicParams[topic] = TopicParams.init() - gossipSub.fanout[topic] = initHashSet[PubSubPeer]() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - var conns = newSeq[Connection]() - for i in 0 ..< 30: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.handler = handler + let (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic) + defer: + await teardownGossipSub(gossipSub, conns) + + # generate mesh and fanout peers + for i, peer in peers: if i mod 2 == 0: gossipSub.fanout[topic].incl(peer) else: @@ -110,38 +77,22 @@ suite "GossipSub Gossip Protocol": # generate messages var seqno = 0'u64 for i in 0 .. 5: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId + let conn = conns[i] inc seqno - let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) + let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg) - let peers = gossipSub.getGossipPeers() - check peers.len == gossipSub.parameters.d - - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() + let gossipPeers = gossipSub.getGossipPeers() + check gossipPeers.len == gossipSub.parameters.d asyncTest "`getGossipPeers` - should not crash on missing topics in fanout": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - - proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = - discard - let topic = "foobar" - gossipSub.topicParams[topic] = TopicParams.init() - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - var conns = newSeq[Connection]() - for i in 0 ..< 30: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.handler = handler + let (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic) + defer: + await teardownGossipSub(gossipSub, conns) + + # generate mesh and fanout peers + for i, peer in peers: if i mod 2 == 0: gossipSub.mesh[topic].incl(peer) gossipSub.grafted(peer, topic) @@ -151,38 +102,22 @@ suite "GossipSub Gossip Protocol": # generate messages var seqno = 0'u64 for i in 0 .. 5: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId + let conn = conns[i] inc seqno - let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) + let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg) - let peers = gossipSub.getGossipPeers() - check peers.len == gossipSub.parameters.d - - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() + let gossipPeers = gossipSub.getGossipPeers() + check gossipPeers.len == gossipSub.parameters.d asyncTest "`getGossipPeers` - should not crash on missing topics in gossip": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - - proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = - discard - let topic = "foobar" - gossipSub.topicParams[topic] = TopicParams.init() - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.fanout[topic] = initHashSet[PubSubPeer]() - var conns = newSeq[Connection]() - for i in 0 ..< 30: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.handler = handler + let (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic) + defer: + await teardownGossipSub(gossipSub, conns) + + # generate mesh and fanout peers + for i, peer in peers: if i mod 2 == 0: gossipSub.mesh[topic].incl(peer) gossipSub.grafted(peer, topic) @@ -192,47 +127,22 @@ suite "GossipSub Gossip Protocol": # generate messages var seqno = 0'u64 for i in 0 .. 5: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId + let conn = conns[i] inc seqno - let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno)) + let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno)) gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg) - let peers = gossipSub.getGossipPeers() - check peers.len == 0 - - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() + let gossipPeers = gossipSub.getGossipPeers() + check gossipPeers.len == 0 asyncTest "handleIHave/Iwant tests": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - - proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = - check false - - proc handler2(topic: string, data: seq[byte]) {.async.} = - discard - let topic = "foobar" - var conns = newSeq[Connection]() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.subscribe(topic, handler2) + var (gossipSub, conns, peers) = + setupGossipSubWithPeers(30, topic, populateMesh = true) + defer: + await teardownGossipSub(gossipSub, conns) - # Instantiates 30 peers and connects all of them to the previously defined `gossipSub` - for i in 0 ..< 30: - # Define a new connection - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.handler = handler - # Add the connection to `gossipSub`, to their `gossipSub.gossipsub` and `gossipSub.mesh` tables - gossipSub.grafted(peer, topic) - gossipSub.mesh[topic].incl(peer) + gossipSub.subscribe(topic, voidTopicHandler) # Peers with no budget should not request messages block: @@ -296,9 +206,6 @@ suite "GossipSub Gossip Protocol": check gossipSub.mcache.msgs.len == 1 - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - asyncTest "messages sent to peers not in the mesh are propagated via gossip": let numberOfNodes = 5 diff --git a/tests/pubsub/testgossipsubmeshmanagement.nim b/tests/pubsub/testgossipsubmeshmanagement.nim index 3cd0f7985..30e79f16d 100644 --- a/tests/pubsub/testgossipsubmeshmanagement.nim +++ b/tests/pubsub/testgossipsubmeshmanagement.nim @@ -25,28 +25,14 @@ suite "GossipSub Mesh Management": params.validateParameters().tryGet() asyncTest "subscribe/unsubscribeAll": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - - proc handler(topic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} = - discard - let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.topicParams[topic] = TopicParams.init() - - var conns = newSeq[Connection]() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - for i in 0 ..< 15: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.sendConn = conn - gossipSub.gossipsub[topic].incl(peer) + let (gossipSub, conns, peers) = + setupGossipSubWithPeers(15, topic, populateGossipsub = true) + defer: + await teardownGossipSub(gossipSub, conns) # test via dynamic dispatch - gossipSub.PubSub.subscribe(topic, handler) + gossipSub.PubSub.subscribe(topic, voidTopicHandler) check: gossipSub.topics.contains(topic) @@ -61,53 +47,27 @@ suite "GossipSub Mesh Management": topic notin gossipSub.mesh # not in mesh topic in gossipSub.gossipsub # but still in gossipsub table (for fanning out) - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - asyncTest "`rebalanceMesh` Degree Lo": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.topicParams[topic] = TopicParams.init() - - var conns = newSeq[Connection]() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - for i in 0 ..< 15: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.sendConn = conn - gossipSub.gossipsub[topic].incl(peer) + let (gossipSub, conns, peers) = + setupGossipSubWithPeers(15, topic, populateGossipsub = true) + defer: + await teardownGossipSub(gossipSub, conns) check gossipSub.peers.len == 15 gossipSub.rebalanceMesh(topic) check gossipSub.mesh[topic].len == gossipSub.parameters.d - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - asyncTest "rebalanceMesh - bad peers": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.topicParams[topic] = TopicParams.init() + let (gossipSub, conns, peers) = + setupGossipSubWithPeers(15, topic, populateGossipsub = true) + defer: + await teardownGossipSub(gossipSub, conns) - var conns = newSeq[Connection]() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() var scoreLow = -11'f64 - for i in 0 ..< 15: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.sendConn = conn + for peer in peers: peer.score = scoreLow - gossipSub.gossipsub[topic].incl(peer) scoreLow += 1.0 check gossipSub.peers.len == 15 @@ -117,54 +77,28 @@ suite "GossipSub Mesh Management": for peer in gossipSub.mesh[topic]: check peer.score >= 0.0 - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - asyncTest "`rebalanceMesh` Degree Hi": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.topicParams[topic] = TopicParams.init() - - var conns = newSeq[Connection]() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - for i in 0 ..< 15: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - gossipSub.grafted(peer, topic) - gossipSub.mesh[topic].incl(peer) + let (gossipSub, conns, peers) = + setupGossipSubWithPeers(15, topic, populateGossipsub = true, populateMesh = true) + defer: + await teardownGossipSub(gossipSub, conns) check gossipSub.mesh[topic].len == 15 gossipSub.rebalanceMesh(topic) check gossipSub.mesh[topic].len == gossipSub.parameters.d + gossipSub.parameters.dScore - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - asyncTest "rebalanceMesh fail due to backoff": - let gossipSub = TestGossipSub.init(newStandardSwitch()) let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.topicParams[topic] = TopicParams.init() - - var conns = newSeq[Connection]() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - for i in 0 ..< 15: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.sendConn = conn - gossipSub.gossipsub[topic].incl(peer) + let (gossipSub, conns, peers) = + setupGossipSubWithPeers(15, topic, populateGossipsub = true) + defer: + await teardownGossipSub(gossipSub, conns) + for peer in peers: gossipSub.backingOff.mgetOrPut(topic, initTable[PeerId, Moment]()).add( - peerId, Moment.now() + 1.hours + peer.peerId, Moment.now() + 1.hours ) let prunes = gossipSub.handleGraft(peer, @[ControlGraft(topicID: topic)]) # there must be a control prune due to violation of backoff @@ -175,34 +109,18 @@ suite "GossipSub Mesh Management": # expect 0 since they are all backing off check gossipSub.mesh[topic].len == 0 - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - asyncTest "rebalanceMesh fail due to backoff - remote": - let gossipSub = TestGossipSub.init(newStandardSwitch()) let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.topicParams[topic] = TopicParams.init() - - var conns = newSeq[Connection]() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - for i in 0 ..< 15: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.sendConn = conn - gossipSub.gossipsub[topic].incl(peer) - gossipSub.mesh[topic].incl(peer) + let (gossipSub, conns, peers) = + setupGossipSubWithPeers(15, topic, populateGossipsub = true, populateMesh = true) + defer: + await teardownGossipSub(gossipSub, conns) check gossipSub.peers.len == 15 gossipSub.rebalanceMesh(topic) check gossipSub.mesh[topic].len != 0 - for i in 0 ..< 15: - let peerId = conns[i].peerId - let peer = gossipSub.getPubSubPeer(peerId) + for peer in peers: gossipSub.handlePrune( peer, @[ @@ -217,45 +135,36 @@ suite "GossipSub Mesh Management": # expect topic cleaned up since they are all pruned check topic notin gossipSub.mesh - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - asyncTest "rebalanceMesh Degree Hi - audit scenario": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[PubSubPeer]() - gossipSub.topicParams[topic] = TopicParams.init() + let + topic = "foobar" + numInPeers = 6 + numOutPeers = 7 + totalPeers = numInPeers + numOutPeers + + let (gossipSub, conns, peers) = setupGossipSubWithPeers( + totalPeers, topic, populateGossipsub = true, populateMesh = true + ) + defer: + await teardownGossipSub(gossipSub, conns) + gossipSub.parameters.dScore = 4 gossipSub.parameters.d = 6 gossipSub.parameters.dOut = 3 gossipSub.parameters.dHigh = 12 gossipSub.parameters.dLow = 4 - var conns = newSeq[Connection]() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() - for i in 0 ..< 6: - let conn = TestBufferStream.new(noop) + for i in 0 ..< numInPeers: + let conn = conns[i] + let peer = peers[i] conn.transportDir = Direction.In - conns &= conn - let peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) peer.score = 40.0 - peer.sendConn = conn - gossipSub.grafted(peer, topic) - gossipSub.mesh[topic].incl(peer) - for i in 0 ..< 7: - let conn = TestBufferStream.new(noop) + for i in numInPeers ..< totalPeers: + let conn = conns[i] + let peer = peers[i] conn.transportDir = Direction.Out - conns &= conn - let peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) peer.score = 10.0 - peer.sendConn = conn - gossipSub.grafted(peer, topic) - gossipSub.mesh[topic].incl(peer) check gossipSub.mesh[topic].len == 13 gossipSub.rebalanceMesh(topic) @@ -268,9 +177,6 @@ suite "GossipSub Mesh Management": # ensure we give priority and keep at least dOut outbound peers check outbound >= gossipSub.parameters.dOut - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - asyncTest "dont prune peers if mesh len is less than d_high": let numberOfNodes = 5 diff --git a/tests/pubsub/testgossipsubmessagehandling.nim b/tests/pubsub/testgossipsubmessagehandling.nim index d9325e968..adc6e8a3f 100644 --- a/tests/pubsub/testgossipsubmessagehandling.nim +++ b/tests/pubsub/testgossipsubmessagehandling.nim @@ -78,38 +78,22 @@ suite "GossipSub Message Handling": checkTrackers() asyncTest "Drop messages of topics without subscription": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - - proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = - check false - let topic = "foobar" - var conns = newSeq[Connection]() - for i in 0 ..< 30: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.handler = handler + var (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic) + defer: + await teardownGossipSub(gossipSub, conns) # generate messages var seqno = 0'u64 for i in 0 .. 5: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) + let conn = conns[i] + let peer = peers[i] inc seqno - let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno)) + let msg = Message.init(conn.peerId, ("bar" & $i).toBytes(), topic, some(seqno)) await gossipSub.rpcHandler(peer, encodeRpcMsg(RPCMsg(messages: @[msg]), false)) check gossipSub.mcache.msgs.len == 0 - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - asyncTest "subscription limits": let gossipSub = TestGossipSub.init(newStandardSwitch()) gossipSub.topicsHigh = 10 diff --git a/tests/pubsub/testgossipsubscoring.nim b/tests/pubsub/testgossipsubscoring.nim index 98eb9082f..d443e2406 100644 --- a/tests/pubsub/testgossipsubscoring.nim +++ b/tests/pubsub/testgossipsubscoring.nim @@ -23,24 +23,18 @@ suite "GossipSub Scoring": checkTrackers() asyncTest "Disconnect bad peers": - let gossipSub = TestGossipSub.init(newStandardSwitch()) + let topic = "foobar" + var (gossipSub, conns, peers) = + setupGossipSubWithPeers(30, topic, populateGossipsub = true) + defer: + await teardownGossipSub(gossipSub, conns) + gossipSub.parameters.disconnectBadPeers = true gossipSub.parameters.appSpecificWeight = 1.0 - proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = - check false - let topic = "foobar" - var conns = newSeq[Connection]() - for i in 0 ..< 30: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.sendConn = conn - peer.handler = handler + for i, peer in peers: peer.appScore = gossipSub.parameters.graylistThreshold - 1 - gossipSub.gossipsub.mgetOrPut(topic, initHashSet[PubSubPeer]()).incl(peer) + let conn = conns[i] gossipSub.switch.connManager.storeMuxer(Muxer(connection: conn)) gossipSub.updateScores() @@ -53,9 +47,6 @@ suite "GossipSub Scoring": # also ensure we cleanup properly the peersInIP table gossipSub.peersInIP.len == 0 - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - asyncTest "flood publish to all peers with score above threshold, regardless of subscription": let numberOfNodes = 3 diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 715eaef56..fa5339c18 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -42,6 +42,21 @@ type dOut*: Option[int] dLazy*: Option[int] +proc noop*(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = + discard + +proc voidTopicHandler*(topic: string, data: seq[byte]) {.async.} = + discard + +proc voidPeerHandler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = + discard + +proc randomPeerId*(): PeerId = + try: + PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet() + except CatchableError as exc: + raise newException(Defect, exc.msg) + proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer = proc getConn(): Future[Connection] {. async: (raises: [CancelledError, GetConnDialError]) @@ -61,11 +76,57 @@ proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer = onNewPeer(p, pubSubPeer) pubSubPeer -proc randomPeerId*(): PeerId = - try: - PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet() - except CatchableError as exc: - raise newException(Defect, exc.msg) +proc setupGossipSubWithPeers*( + numPeers: int, + topics: seq[string], + populateGossipsub: bool = false, + populateMesh: bool = false, + populateFanout: bool = false, +): (TestGossipSub, seq[Connection], seq[PubSubPeer]) = + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + for topic in topics: + gossipSub.topicParams[topic] = TopicParams.init() + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + gossipSub.fanout[topic] = initHashSet[PubSubPeer]() + + var conns = newSeq[Connection]() + var peers = newSeq[PubSubPeer]() + for i in 0 ..< numPeers: + let conn = TestBufferStream.new(noop) + conns &= conn + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) + peer.sendConn = conn + peer.handler = voidPeerHandler + peers &= peer + for topic in topics: + if (populateGossipsub): + gossipSub.gossipsub[topic].incl(peer) + if (populateMesh): + gossipSub.grafted(peer, topic) + gossipSub.mesh[topic].incl(peer) + if (populateFanout): + gossipSub.fanout[topic].incl(peer) + + return (gossipSub, conns, peers) + +proc setupGossipSubWithPeers*( + numPeers: int, + topic: string, + populateGossipsub: bool = false, + populateMesh: bool = false, + populateFanout: bool = false, +): (TestGossipSub, seq[Connection], seq[PubSubPeer]) = + return setupGossipSubWithPeers( + numPeers, @[topic], populateGossipsub, populateMesh, populateFanout + ) + +proc teardownGossipSub*(gossipSub: TestGossipSub, conns: seq[Connection]) {.async.} = + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() func defaultMsgIdProvider*(m: Message): Result[MessageId, ValidationResult] = let mid = @@ -371,12 +432,6 @@ template tryPublish*( doAssert pubs >= require, "Failed to publish!" -proc noop*(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = - discard - -proc voidTopicHandler*(topic: string, data: seq[byte]) {.async.} = - discard - proc createCompleteHandler*(): ( Future[bool], proc(topic: string, data: seq[byte]) {.async.} ) =