test(gossipsub): message propagation (#1184)

Co-authored-by: Radoslaw Kaminski <radoslaw@status.im>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Álex
2025-04-14 16:49:13 +02:00
committed by GitHub
parent fb41972ba3
commit ac25da6cea
10 changed files with 762 additions and 396 deletions

View File

@@ -14,8 +14,10 @@ import ../libp2p/protocols/secure/secure
import ../libp2p/switch
import ../libp2p/nameresolving/[nameresolver, mockresolver]
import "."/[asyncunit, errorhelpers]
export asyncunit, errorhelpers, mockresolver
import errorhelpers
import utils/async_tests
export async_tests, errorhelpers, mockresolver
const
StreamTransportTrackerName = "stream.transport"

View File

@@ -49,13 +49,10 @@ suite "FloodSub":
check topic == "foobar"
completionFut.complete(true)
let
nodes = generateNodes(2)
let nodes = generateNodes(2)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
@@ -71,48 +68,33 @@ suite "FloodSub":
agentA == "nim-libp2p"
agentB == "nim-libp2p"
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())
asyncTest "FloodSub basic publish/subscribe B -> A":
var completionFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async.} =
check topic == "foobar"
completionFut.complete(true)
let
nodes = generateNodes(2)
let nodes = generateNodes(2)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
nodes[0].subscribe("foobar", handler)
await waitSub(nodes[1], nodes[0], "foobar")
check (await nodes[1].publish("foobar", "Hello!".toBytes())) > 0
check (await completionFut.wait(5.seconds)) == true
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut)
asyncTest "FloodSub validation should succeed":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async.} =
check topic == "foobar"
handlerFut.complete(true)
let
nodes = generateNodes(2)
let nodes = generateNodes(2)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
@@ -130,21 +112,15 @@ suite "FloodSub":
check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0
check (await handlerFut) == true
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut)
asyncTest "FloodSub validation should fail":
proc handler(topic: string, data: seq[byte]) {.async.} =
check false # if we get here, it should fail
let
nodes = generateNodes(2)
let nodes = generateNodes(2)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
await subscribeNodes(nodes)
nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
@@ -159,23 +135,17 @@ suite "FloodSub":
discard await nodes[0].publish("foobar", "Hello!".toBytes())
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut)
asyncTest "FloodSub validation one fails and one succeeds":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async.} =
check topic == "foo"
handlerFut.complete(true)
let
nodes = generateNodes(2)
let nodes = generateNodes(2)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
await subscribeNodes(nodes)
nodes[1].subscribe("foo", handler)
await waitSub(nodes[0], nodes[1], "foo")
nodes[1].subscribe("bar", handler)
@@ -194,10 +164,6 @@ suite "FloodSub":
check (await nodes[0].publish("foo", "Hello!".toBytes())) > 0
check (await nodes[0].publish("bar", "Hello!".toBytes())) > 0
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut)
asyncTest "FloodSub multiple peers, no self trigger":
var runs = 10
@@ -219,11 +185,10 @@ suite "FloodSub":
counter,
)
let
nodes = generateNodes(runs, triggerSelf = false)
nodesFut = nodes.mapIt(it.switch.start())
let nodes = generateNodes(runs, triggerSelf = false)
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
for i in 0 ..< runs:
nodes[i].subscribe("foobar", futs[i][1])
@@ -241,9 +206,6 @@ suite "FloodSub":
await allFuturesThrowing(pubs)
await allFuturesThrowing(futs.mapIt(it[0]))
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
await allFuturesThrowing(nodesFut)
asyncTest "FloodSub multiple peers, with self trigger":
var runs = 10
@@ -266,11 +228,10 @@ suite "FloodSub":
counter,
)
let
nodes = generateNodes(runs, triggerSelf = true)
nodesFut = nodes.mapIt(it.switch.start())
let nodes = generateNodes(runs, triggerSelf = true)
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
for i in 0 ..< runs:
nodes[i].subscribe("foobar", futs[i][1])
@@ -299,10 +260,6 @@ suite "FloodSub":
# remove the topic tho
node.topics.len == 0
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
await allFuturesThrowing(nodesFut)
asyncTest "FloodSub message size validation":
var messageReceived = 0
proc handler(topic: string, data: seq[byte]) {.async.} =
@@ -313,11 +270,9 @@ suite "FloodSub":
bigNode = generateNodes(1)
smallNode = generateNodes(1, maxMessageSize = 200)
# start switches
nodesFut =
await allFinished(bigNode[0].switch.start(), smallNode[0].switch.start())
startNodesAndDeferStop(bigNode & smallNode)
await connectNodesStar(bigNode & smallNode)
await subscribeNodes(bigNode & smallNode)
bigNode[0].subscribe("foo", handler)
smallNode[0].subscribe("foo", handler)
await waitSub(bigNode[0], smallNode[0], "foo")
@@ -337,10 +292,6 @@ suite "FloodSub":
check (await smallNode[0].publish("foo", bigMessage)) > 0
check (await bigNode[0].publish("foo", bigMessage)) > 0
await allFuturesThrowing(smallNode[0].switch.stop(), bigNode[0].switch.stop())
await allFuturesThrowing(nodesFut)
asyncTest "FloodSub message size validation 2":
var messageReceived = 0
proc handler(topic: string, data: seq[byte]) {.async.} =
@@ -350,11 +301,9 @@ suite "FloodSub":
bigNode1 = generateNodes(1, maxMessageSize = 20000000)
bigNode2 = generateNodes(1, maxMessageSize = 20000000)
# start switches
nodesFut =
await allFinished(bigNode1[0].switch.start(), bigNode2[0].switch.start())
startNodesAndDeferStop(bigNode1 & bigNode2)
await connectNodesStar(bigNode1 & bigNode2)
await subscribeNodes(bigNode1 & bigNode2)
bigNode2[0].subscribe("foo", handler)
await waitSub(bigNode1[0], bigNode2[0], "foo")
@@ -364,7 +313,3 @@ suite "FloodSub":
checkUntilTimeout:
messageReceived == 1
await allFuturesThrowing(bigNode1[0].switch.stop(), bigNode2[0].switch.stop())
await allFuturesThrowing(nodesFut)

View File

@@ -13,10 +13,10 @@ import sequtils, options, tables, sets, sugar
import chronos, stew/byteutils, chronos/ratelimit
import chronicles
import metrics
import results
import
utils,
../../libp2p/[
errors,
peerid,
peerinfo,
stream/connection,
@@ -31,7 +31,9 @@ import
protocols/pubsub/rpc/messages,
]
import ../../libp2p/protocols/pubsub/errors as pubsub_errors
import ../helpers
import ../helpers, ../utils/[futures, async_tests]
from ../../libp2p/protocols/pubsub/mcache import window
proc `$`(peer: PubSubPeer): string =
shortLog(peer)
@@ -58,13 +60,10 @@ suite "GossipSub":
check topic == "foobar"
handlerFut.complete(true)
let
nodes = generateNodes(2, gossip = true)
let nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
@@ -88,21 +87,14 @@ suite "GossipSub":
check (await validatorFut) and (await handlerFut)
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub validation should fail (reject)":
proc handler(topic: string, data: seq[byte]) {.async.} =
check false # if we get here, it should fail
let
nodes = generateNodes(2, gossip = true)
let nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
@@ -128,21 +120,14 @@ suite "GossipSub":
check (await validatorFut) == true
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub validation should fail (ignore)":
proc handler(topic: string, data: seq[byte]) {.async.} =
check false # if we get here, it should fail
let
nodes = generateNodes(2, gossip = true)
let nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
@@ -168,23 +153,16 @@ suite "GossipSub":
check (await validatorFut) == true
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub validation one fails and one succeeds":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async.} =
check topic == "foo"
handlerFut.complete(true)
let
nodes = generateNodes(2, gossip = true)
let nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
nodes[1].subscribe("foo", handler)
nodes[1].subscribe("bar", handler)
@@ -216,10 +194,6 @@ suite "GossipSub":
"bar" notin gossip1.mesh and gossip1.fanout["bar"].len == 1
"bar" notin gossip2.mesh and "bar" notin gossip2.fanout
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub's observers should run after message is sent, received and validated":
var
recvCounter = 0
@@ -242,10 +216,9 @@ suite "GossipSub":
let obs1 = PubSubObserver(onRecv: onRecv, onValidated: onValidated)
let nodes = generateNodes(2, gossip = true)
# start switches
discard await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
nodes[0].addObserver(obs0)
nodes[1].addObserver(obs1)
@@ -275,38 +248,22 @@ suite "GossipSub":
validatedCounter == 1
sendCounter == 2
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
asyncTest "GossipSub unsub - resub faster than backoff":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async.} =
# For this test to work we'd require a way to disable fanout.
# There's not a way to toggle it, and mocking it didn't work as there's not a reliable mock available.
skip()
return
# Instantiate handlers and validators
var handlerFut0 = newFuture[bool]()
proc handler0(topic: string, data: seq[byte]) {.async.} =
check topic == "foobar"
handlerFut.complete(true)
handlerFut0.complete(true)
let
nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
await subscribeNodes(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
var subs: seq[Future[void]]
subs &= waitSub(nodes[1], nodes[0], "foobar")
subs &= waitSub(nodes[0], nodes[1], "foobar")
await allFuturesThrowing(subs)
nodes[0].unsubscribe("foobar", handler)
nodes[0].subscribe("foobar", handler)
# regular backoff is 60 seconds, so we must not wait that long
await (
waitSub(nodes[0], nodes[1], "foobar") and waitSub(nodes[1], nodes[0], "foobar")
).wait(30.seconds)
var handlerFut1 = newFuture[bool]()
proc handler1(topic: string, data: seq[byte]) {.async.} =
check topic == "foobar"
handlerFut1.complete(true)
var validatorFut = newFuture[bool]()
proc validator(
@@ -316,26 +273,69 @@ suite "GossipSub":
validatorFut.complete(true)
result = ValidationResult.Accept
# Setup nodes and start switches
let
nodes = generateNodes(2, gossip = true, unsubscribeBackoff = 5.seconds)
topic = "foobar"
# Connect nodes
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
# Subscribe both nodes to the topic and node1 (receiver) to the validator
nodes[0].subscribe(topic, handler0)
nodes[1].subscribe(topic, handler1)
nodes[1].addValidator("foobar", validator)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
await sleepAsync(DURATION_TIMEOUT)
check (await validatorFut) and (await handlerFut)
# Wait for both nodes to verify others' subscription
var subs: seq[Future[void]]
subs &= waitSub(nodes[1], nodes[0], topic)
subs &= waitSub(nodes[0], nodes[1], topic)
await allFuturesThrowing(subs)
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
# When unsubscribing and resubscribing in a short time frame, the backoff period should be triggered
nodes[1].unsubscribe(topic, handler1)
await sleepAsync(DURATION_TIMEOUT)
nodes[1].subscribe(topic, handler1)
await sleepAsync(DURATION_TIMEOUT)
await allFuturesThrowing(nodesFut.concat())
# Backoff is set to 5 seconds, and the amount of sleeping time since the unsubsribe until now is 3-4s~
# Meaning, the subscription shouldn't have been processed yet because it's still in backoff period
# When publishing under this condition
discard await nodes[0].publish("foobar", "Hello!".toBytes())
await sleepAsync(DURATION_TIMEOUT)
# Then the message should not be received:
check:
validatorFut.toState().isPending()
handlerFut1.toState().isPending()
handlerFut0.toState().isPending()
validatorFut.reset()
handlerFut0.reset()
handlerFut1.reset()
# If we wait backoff period to end, around 1-2s
await waitForMesh(nodes[0], nodes[1], topic, 3.seconds)
discard await nodes[0].publish("foobar", "Hello!".toBytes())
await sleepAsync(DURATION_TIMEOUT)
# Then the message should be received
check:
validatorFut.toState().isCompleted()
handlerFut1.toState().isCompleted()
handlerFut0.toState().isPending()
asyncTest "e2e - GossipSub should add remote peer topic subscriptions":
proc handler(topic: string, data: seq[byte]) {.async.} =
discard
let
nodes = generateNodes(2, gossip = true)
let nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
nodes[1].subscribe("foobar", handler)
@@ -347,21 +347,14 @@ suite "GossipSub":
"foobar" in gossip1.gossipsub
gossip1.gossipsub.hasPeerId("foobar", gossip2.peerInfo.peerId)
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - GossipSub should add remote peer topic subscriptions if both peers are subscribed":
proc handler(topic: string, data: seq[byte]) {.async.} =
discard
let
nodes = generateNodes(2, gossip = true)
let nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
@@ -389,23 +382,16 @@ suite "GossipSub":
gossip2.gossipsub.hasPeerId("foobar", gossip1.peerInfo.peerId) or
gossip2.mesh.hasPeerId("foobar", gossip1.peerInfo.peerId)
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - GossipSub send over fanout A -> B":
var passed = newFuture[void]()
proc handler(topic: string, data: seq[byte]) {.async.} =
check topic == "foobar"
passed.complete()
let
nodes = generateNodes(2, gossip = true)
let nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
@@ -436,9 +422,6 @@ suite "GossipSub":
await passed.wait(2.seconds)
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())
check observed == 2
asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic":
@@ -447,17 +430,15 @@ suite "GossipSub":
check topic == "foobar"
passed.complete()
let
nodes = generateNodes(2, gossip = true, unsubscribeBackoff = 10.minutes)
let nodes = generateNodes(2, gossip = true, unsubscribeBackoff = 10.minutes)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
startNodesAndDeferStop(nodes)
GossipSub(nodes[1]).parameters.d = 0
GossipSub(nodes[1]).parameters.dHigh = 0
GossipSub(nodes[1]).parameters.dLow = 0
await subscribeNodes(nodes)
await connectNodesStar(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
@@ -481,23 +462,16 @@ suite "GossipSub":
trace "test done, stopping..."
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - GossipSub send over mesh A -> B":
var passed: Future[bool] = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async.} =
check topic == "foobar"
passed.complete(true)
let
nodes = generateNodes(2, gossip = true)
let nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
@@ -518,24 +492,15 @@ suite "GossipSub":
gossip2.mesh.hasPeerId("foobar", gossip1.peerInfo.peerId)
not gossip2.fanout.hasPeerId("foobar", gossip1.peerInfo.peerId)
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - GossipSub should not send to source & peers who already seen":
# 3 nodes: A, B, C
# A publishes, C relays, B is having a long validation
# so B should not send to anyone
let
nodes = generateNodes(3, gossip = true)
let nodes = generateNodes(3, gossip = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(), nodes[1].switch.start(), nodes[2].switch.start()
)
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
var cRelayed: Future[void] = newFuture[void]()
var bFinished: Future[void] = newFuture[void]()
@@ -593,30 +558,22 @@ suite "GossipSub":
await bFinished
await allFuturesThrowing(
nodes[0].switch.stop(), nodes[1].switch.stop(), nodes[2].switch.stop()
)
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - GossipSub send over floodPublish A -> B":
var passed: Future[bool] = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async.} =
check topic == "foobar"
passed.complete(true)
let
nodes = generateNodes(2, gossip = true)
let nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
startNodesAndDeferStop(nodes)
var gossip1: GossipSub = GossipSub(nodes[0])
gossip1.parameters.floodPublish = true
var gossip2: GossipSub = GossipSub(nodes[1])
gossip2.parameters.floodPublish = true
await subscribeNodes(nodes)
await connectNodesStar(nodes)
# nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
@@ -631,20 +588,10 @@ suite "GossipSub":
"foobar" notin gossip2.gossipsub
not gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId)
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())
# Helper procedures to avoid repetition
proc setupNodes(count: int): seq[PubSub] =
generateNodes(count, gossip = true)
proc startNodes(nodes: seq[PubSub]) {.async.} =
await allFuturesThrowing(nodes.mapIt(it.switch.start()))
proc stopNodes(nodes: seq[PubSub]) {.async.} =
await allFuturesThrowing(nodes.mapIt(it.switch.stop()))
proc connectNodes(nodes: seq[PubSub], target: PubSub) {.async.} =
proc handler(topic: string, data: seq[byte]) {.async.} =
check topic == "foobar"
@@ -689,10 +636,9 @@ suite "GossipSub":
gossip1.parameters.floodPublish = true
gossip1.parameters.heartbeatInterval = milliseconds(700)
await startNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodes(nodes[1 ..^ 1], nodes[0])
await baseTestProcedure(nodes, gossip1, gossip1.parameters.dLow, 17)
await stopNodes(nodes)
asyncTest "e2e - GossipSub floodPublish limit with bandwidthEstimatebps = 0":
let
@@ -703,19 +649,17 @@ suite "GossipSub":
gossip1.parameters.heartbeatInterval = milliseconds(700)
gossip1.parameters.bandwidthEstimatebps = 0
await startNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodes(nodes[1 ..^ 1], nodes[0])
await baseTestProcedure(nodes, gossip1, nodes.len - 1, nodes.len - 1)
await stopNodes(nodes)
asyncTest "e2e - GossipSub with multiple peers":
var runs = 10
let
nodes = generateNodes(runs, gossip = true, triggerSelf = true)
nodesFut = nodes.mapIt(it.switch.start())
let nodes = generateNodes(runs, gossip = true, triggerSelf = true)
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
var seen: Table[string, int]
var seenFut = newFuture[void]()
@@ -751,18 +695,13 @@ suite "GossipSub":
check:
"foobar" in gossip.gossipsub
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
await allFuturesThrowing(nodesFut)
asyncTest "e2e - GossipSub with multiple peers (sparse)":
var runs = 10
let
nodes = generateNodes(runs, gossip = true, triggerSelf = true)
nodesFut = nodes.mapIt(it.switch.start())
let nodes = generateNodes(runs, gossip = true, triggerSelf = true)
await subscribeSparseNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesSparse(nodes)
var seen: Table[string, int]
var seenFut = newFuture[void]()
@@ -805,10 +744,6 @@ suite "GossipSub":
gossip.fanout.len == 0
gossip.mesh["foobar"].len > 0
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
await allFuturesThrowing(nodesFut)
asyncTest "e2e - GossipSub peer exchange":
# A, B & C are subscribed to something
# B unsubcribe from it, it should send
@@ -818,22 +753,18 @@ suite "GossipSub":
proc handler(topic: string, data: seq[byte]) {.async.} =
discard # not used in this test
let
nodes =
generateNodes(2, gossip = true, enablePX = true) &
generateNodes(1, gossip = true, sendSignedPeerRecord = true)
let nodes =
generateNodes(2, gossip = true, enablePX = true) &
generateNodes(1, gossip = true, sendSignedPeerRecord = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(), nodes[1].switch.start(), nodes[2].switch.start()
)
startNodesAndDeferStop(nodes)
var
gossip0 = GossipSub(nodes[0])
gossip1 = GossipSub(nodes[1])
gossip2 = GossipSub(nodes[1])
gossip2 = GossipSub(nodes[2])
await subscribeNodes(nodes)
await connectNodesStar(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
@@ -843,24 +774,39 @@ suite "GossipSub":
if x != y:
await waitSub(nodes[x], nodes[y], "foobar")
var passed: Future[void] = newFuture[void]()
# Setup record handlers for all nodes
var
passed0: Future[void] = newFuture[void]()
passed2: Future[void] = newFuture[void]()
gossip0.routingRecordsHandler.add(
proc(peer: PeerId, tag: string, peers: seq[RoutingRecordsPair]) =
check:
tag == "foobar"
peers.len == 2
peers[0].record.isSome() xor peers[1].record.isSome()
passed.complete()
passed0.complete()
)
gossip1.routingRecordsHandler.add(
proc(peer: PeerId, tag: string, peers: seq[RoutingRecordsPair]) =
raiseAssert "should not get here"
)
gossip2.routingRecordsHandler.add(
proc(peer: PeerId, tag: string, peers: seq[RoutingRecordsPair]) =
check:
tag == "foobar"
peers.len == 2
peers[0].record.isSome() xor peers[1].record.isSome()
passed2.complete()
)
# Unsubscribe from the topic
nodes[1].unsubscribe("foobar", handler)
await passed.wait(5.seconds)
await allFuturesThrowing(
nodes[0].switch.stop(), nodes[1].switch.stop(), nodes[2].switch.stop()
)
await allFuturesThrowing(nodesFut.concat())
# Then verify what nodes receive the PX
let results = await waitForStates(@[passed0, passed2])
check:
results[0].isCompleted()
results[1].isCompleted()
asyncTest "e2e - iDontWant":
# 3 nodes: A <=> B <=> C
@@ -869,12 +815,9 @@ suite "GossipSub":
# We also check that B sends IDONTWANT to C, but not A
func dumbMsgIdProvider(m: Message): Result[MessageId, ValidationResult] =
ok(newSeq[byte](10))
let
nodes = generateNodes(3, gossip = true, msgIdProvider = dumbMsgIdProvider)
let nodes = generateNodes(3, gossip = true, msgIdProvider = dumbMsgIdProvider)
nodesFut = await allFinished(
nodes[0].switch.start(), nodes[1].switch.start(), nodes[2].switch.start()
)
startNodesAndDeferStop(nodes)
await nodes[0].switch.connect(
nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs
@@ -926,24 +869,14 @@ suite "GossipSub":
check:
toSeq(gossip1.mesh.getOrDefault("foobar")).anyIt(it.iDontWants[^1].len == 0)
await allFuturesThrowing(
nodes[0].switch.stop(), nodes[1].switch.stop(), nodes[2].switch.stop()
)
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - iDontWant is broadcasted on publish":
func dumbMsgIdProvider(m: Message): Result[MessageId, ValidationResult] =
ok(newSeq[byte](10))
let
nodes = generateNodes(
2,
gossip = true,
msgIdProvider = dumbMsgIdProvider,
sendIDontWantOnPublish = true,
)
let nodes = generateNodes(
2, gossip = true, msgIdProvider = dumbMsgIdProvider, sendIDontWantOnPublish = true
)
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
startNodesAndDeferStop(nodes)
await nodes[0].switch.connect(
nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs
@@ -966,10 +899,6 @@ suite "GossipSub":
checkUntilTimeout:
gossip2.mesh.getOrDefault("foobar").anyIt(it.iDontWants[^1].len == 1)
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - iDontWant is sent only for 1.2":
# 3 nodes: A <=> B <=> C
# (A & C are NOT connected). We pre-emptively send a dontwant from C to B,
@@ -987,9 +916,7 @@ suite "GossipSub":
gossipSubVersion = GossipSubCodec_11,
)[0]
let nodesFut = await allFinished(
nodeA.switch.start(), nodeB.switch.start(), nodeC.switch.start()
)
startNodesAndDeferStop(@[nodeA, nodeB, nodeC])
await nodeA.switch.connect(
nodeB.switch.peerInfo.peerId, nodeB.switch.peerInfo.addrs
@@ -1028,19 +955,12 @@ suite "GossipSub":
toSeq(gossipC.mesh.getOrDefault("foobar")).anyIt(it.iDontWants[^1].len == 0)
toSeq(gossipA.mesh.getOrDefault("foobar")).anyIt(it.iDontWants[^1].len == 0)
await allFuturesThrowing(
nodeA.switch.stop(), nodeB.switch.stop(), nodeC.switch.stop()
)
await allFuturesThrowing(nodesFut.concat())
proc initializeGossipTest(): Future[(seq[PubSub], GossipSub, GossipSub)] {.async.} =
let nodes =
generateNodes(2, gossip = true, overheadRateLimit = Opt.some((20, 1.millis)))
discard await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
await subscribeNodes(nodes)
await startNodes(nodes)
await connectNodesStar(nodes)
proc handle(topic: string, data: seq[byte]) {.async.} =
discard
@@ -1213,7 +1133,7 @@ suite "GossipSub":
gossipSubVersion = GossipSubCodec_10,
)[0]
let nodesFut = await allFinished(node0.switch.start(), node1.switch.start())
startNodesAndDeferStop(@[node0, node1])
await node0.switch.connect(
node1.switch.peerInfo.peerId, node1.switch.peerInfo.addrs
@@ -1233,7 +1153,3 @@ suite "GossipSub":
gossip0.mesh.getOrDefault("foobar").toSeq[0].codec == GossipSubCodec_10
checkUntilTimeout:
gossip1.mesh.getOrDefault("foobar").toSeq[0].codec == GossipSubCodec_10
await allFuturesThrowing(node0.switch.stop(), node1.switch.stop())
await allFuturesThrowing(nodesFut.concat())

View File

@@ -25,8 +25,9 @@ import
protocols/pubsub/pubsubpeer,
protocols/pubsub/peertable,
protocols/pubsub/rpc/messages,
]
import ../helpers
],
../utils/[futures, async_tests],
../helpers
template tryPublish(
call: untyped, require: int, wait = 10.milliseconds, timeout = 10.seconds
@@ -47,11 +48,10 @@ suite "GossipSub":
asyncTest "e2e - GossipSub with multiple peers - control deliver (sparse)":
var runs = 10
let
nodes = generateNodes(runs, gossip = true, triggerSelf = true)
nodesFut = nodes.mapIt(it.switch.start())
let nodes = generateNodes(runs, gossip = true, triggerSelf = true)
await subscribeSparseNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesSparse(nodes)
var seen: Table[string, int]
var seenFut = newFuture[void]()
@@ -90,21 +90,15 @@ suite "GossipSub":
check:
v >= 1
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
await allFuturesThrowing(nodesFut)
asyncTest "GossipSub invalid topic subscription":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async.} =
check topic == "foobar"
handlerFut.complete(true)
let
nodes = generateNodes(2, gossip = true)
let nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
startNodesAndDeferStop(nodes)
# We must subscribe before setting the validator
nodes[0].subscribe("foobar", handler)
@@ -121,19 +115,16 @@ suite "GossipSub":
else:
true
await subscribeNodes(nodes)
await connectNodesStar(nodes)
nodes[1].subscribe("foobar", handler)
await invalidDetected.wait(10.seconds)
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub test directPeers":
let nodes = generateNodes(2, gossip = true)
await allFutures(nodes[0].switch.start(), nodes[1].switch.start())
startNodesAndDeferStop(nodes)
await GossipSub(nodes[0]).addDirectPeer(
nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs
)
@@ -150,7 +141,7 @@ suite "GossipSub":
true
# DO NOT SUBSCRIBE, CONNECTION SHOULD HAPPEN
### await subscribeNodes(nodes)
### await connectNodesStar(nodes)
proc handler(topic: string, data: seq[byte]) {.async.} =
discard
@@ -159,16 +150,10 @@ suite "GossipSub":
await invalidDetected.wait(10.seconds)
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
asyncTest "GossipSub directPeers: always forward messages":
let
nodes = generateNodes(3, gossip = true)
let nodes = generateNodes(3, gossip = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(), nodes[1].switch.start(), nodes[2].switch.start()
)
startNodesAndDeferStop(nodes)
await GossipSub(nodes[0]).addDirectPeer(
nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs
@@ -204,18 +189,10 @@ suite "GossipSub":
check "foobar" notin GossipSub(nodes[1]).mesh
check "foobar" notin GossipSub(nodes[2]).mesh
await allFuturesThrowing(
nodes[0].switch.stop(), nodes[1].switch.stop(), nodes[2].switch.stop()
)
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub directPeers: don't kick direct peer with low score":
let
nodes = generateNodes(2, gossip = true)
let nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
startNodesAndDeferStop(nodes)
await GossipSub(nodes[0]).addDirectPeer(
nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs
@@ -252,18 +229,58 @@ suite "GossipSub":
# Without directPeers, this would fail
await handlerFut.wait(1.seconds)
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
asyncTest "GossipSub directPeers: send message to unsubscribed direct peer":
# Given 2 nodes
let
numberOfNodes = 2
nodes = generateNodes(numberOfNodes, gossip = true)
node0 = nodes[0]
node1 = nodes[1]
g0 = GossipSub(node0)
g1 = GossipSub(node1)
await allFuturesThrowing(nodesFut.concat())
startNodesAndDeferStop(nodes)
# With message observers
var
messageReceived0 = newFuture[bool]()
messageReceived1 = newFuture[bool]()
proc observer0(peer: PubSubPeer, msgs: var RPCMsg) =
for message in msgs.messages:
if message.topic == "foobar":
messageReceived0.complete(true)
proc observer1(peer: PubSubPeer, msgs: var RPCMsg) =
for message in msgs.messages:
if message.topic == "foobar":
messageReceived1.complete(true)
node0.addObserver(PubSubObserver(onRecv: observer0))
node1.addObserver(PubSubObserver(onRecv: observer1))
# Connect them as direct peers
await g0.addDirectPeer(node1.peerInfo.peerId, node1.peerInfo.addrs)
await g1.addDirectPeer(node0.peerInfo.peerId, node0.peerInfo.addrs)
# When node 0 sends a message
let message = "Hello!".toBytes()
let publishResult = await node0.publish("foobar", message)
# None should receive the message as they are not subscribed to the topic
let results = await waitForStates(@[messageReceived0, messageReceived1])
check:
publishResult == 0
results[0].isPending()
results[1].isPending()
asyncTest "GossipSub peers disconnections mechanics":
var runs = 10
let
nodes = generateNodes(runs, gossip = true, triggerSelf = true)
nodesFut = nodes.mapIt(it.switch.start())
let nodes = generateNodes(runs, gossip = true, triggerSelf = true)
await subscribeNodes(nodes)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
var seen: Table[string, int]
var seenFut = newFuture[void]()
@@ -342,10 +359,6 @@ suite "GossipSub":
check:
GossipSub(nodes[0]).peerStats.len == runs - 1 # minus self
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
await allFuturesThrowing(nodesFut)
asyncTest "GossipSub scoring - decayInterval":
let nodes = generateNodes(2, gossip = true)
@@ -355,15 +368,13 @@ suite "GossipSub":
# sleeps to be safe here
gossip.parameters.decayInterval = 300.milliseconds
let
# start switches
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
startNodesAndDeferStop(nodes)
var handlerFut = newFuture[void]()
proc handler(topic: string, data: seq[byte]) {.async.} =
handlerFut.complete()
await subscribeNodes(nodes)
await connectNodesStar(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
@@ -381,7 +392,3 @@ suite "GossipSub":
check:
gossip.peerStats[nodes[1].peerInfo.peerId].topicInfos["foobar"].meshMessageDeliveries in
50.0 .. 66.0
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())

View File

@@ -0,0 +1,361 @@
# Nim-Libp2p
# Copyright (c) 2025 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.used.}
import sequtils, tables, sets, sugar
import chronos, stew/byteutils
import chronicles
import metrics
import
utils,
../../libp2p/[
protocols/pubsub/pubsub,
protocols/pubsub/gossipsub,
protocols/pubsub/peertable,
protocols/pubsub/rpc/messages,
]
import ../helpers, ../utils/[futures]
from ../../libp2p/protocols/pubsub/mcache import window
proc voidTopicHandler(topic: string, data: seq[byte]) {.async.} =
discard
proc createCompleteHandler(): (
Future[bool], proc(topic: string, data: seq[byte]) {.async.}
) =
var fut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async.} =
fut.complete(true)
return (fut, handler)
proc addIHaveObservers(nodes: seq[auto], topic: string, receivedIHaves: ref seq[int]) =
let numberOfNodes = nodes.len
receivedIHaves[] = repeat(0, numberOfNodes)
for i in 0 ..< numberOfNodes:
var pubsubObserver: PubSubObserver
capture i:
let checkForIhaves = proc(peer: PubSubPeer, msgs: var RPCMsg) =
if msgs.control.isSome:
let iHave = msgs.control.get.ihave
if iHave.len > 0:
for msg in iHave:
if msg.topicID == topic:
receivedIHaves[i] += 1
pubsubObserver = PubSubObserver(onRecv: checkForIhaves)
nodes[i].addObserver(pubsubObserver)
proc addIDontWantObservers(nodes: seq[auto], receivedIDontWants: ref seq[int]) =
let numberOfNodes = nodes.len
receivedIDontWants[] = repeat(0, numberOfNodes)
for i in 0 ..< numberOfNodes:
var pubsubObserver: PubSubObserver
capture i:
let checkForIDontWant = proc(peer: PubSubPeer, msgs: var RPCMsg) =
if msgs.control.isSome:
let iDontWant = msgs.control.get.idontwant
if iDontWant.len > 0:
receivedIDontWants[i] += 1
pubsubObserver = PubSubObserver(onRecv: checkForIDontWant)
nodes[i].addObserver(pubsubObserver)
suite "Gossipsub Parameters":
teardown:
checkTrackers()
asyncTest "dont prune peers if mesh len is less than d_high":
let
numberOfNodes = 5
topic = "foobar"
nodes = generateNodes(numberOfNodes, gossip = true)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
subscribeAllNodes(nodes, topic, voidTopicHandler)
await waitSubAllNodes(nodes, topic)
let expectedNumberOfPeers = numberOfNodes - 1
for i in 0 ..< numberOfNodes:
var gossip = GossipSub(nodes[i])
check:
gossip.gossipsub[topic].len == expectedNumberOfPeers
gossip.mesh[topic].len == expectedNumberOfPeers
gossip.fanout.len == 0
asyncTest "prune peers if mesh len is higher than d_high":
let
numberofNodes = 15
topic = "foobar"
nodes = generateNodes(numberofNodes, gossip = true)
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
subscribeAllNodes(nodes, topic, voidTopicHandler)
await waitSubAllNodes(nodes, topic)
# Give it time for a heartbeat
await sleepAsync(DURATION_TIMEOUT_EXTENDED)
let
expectedNumberOfPeers = numberofNodes - 1
dHigh = 12
d = 6
dLow = 4
for i in 0 ..< numberofNodes:
var gossip = GossipSub(nodes[i])
check:
gossip.gossipsub[topic].len == expectedNumberOfPeers
gossip.mesh[topic].len >= dLow and gossip.mesh[topic].len <= dHigh
gossip.fanout.len == 0
asyncTest "messages sent to peers not in the mesh are propagated via gossip":
let
numberOfNodes = 5
topic = "foobar"
dValues = DValues(dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1))
nodes = generateNodes(numberOfNodes, gossip = true, dValues = some(dValues))
startNodesAndDeferStop(nodes)
# All nodes are checking for iHave messages
var receivedIHavesRef = new seq[int]
addIHaveObservers(nodes, topic, receivedIHavesRef)
# And are interconnected
await connectNodesStar(nodes)
# And subscribed to the same topic
subscribeAllNodes(nodes, topic, voidTopicHandler)
await sleepAsync(DURATION_TIMEOUT)
# When node 0 sends a message
check (await nodes[0].publish(topic, "Hello!".toBytes())) > 0
await sleepAsync(DURATION_TIMEOUT)
# At least one of the nodes should have received an iHave message
# The check is made this way because the mesh structure changes from run to run
let receivedIHaves = receivedIHavesRef[]
check:
anyIt(receivedIHavesRef[], it > 0)
asyncTest "messages are not sent back to source or forwarding peer":
let
numberOfNodes = 3
topic = "foobar"
nodes = generateNodes(numberOfNodes, gossip = true)
startNodesAndDeferStop(nodes)
let (handlerFut0, handler0) = createCompleteHandler()
let (handlerFut1, handler1) = createCompleteHandler()
let (handlerFut2, handler2) = createCompleteHandler()
# Nodes are connected in a ring
await connectNodes(nodes[0], nodes[1])
await connectNodes(nodes[1], nodes[2])
await connectNodes(nodes[2], nodes[0])
# And subscribed to the same topic
subscribeAllNodes(nodes, topic, @[handler0, handler1, handler2])
await sleepAsync(DURATION_TIMEOUT)
# When node 0 sends a message
check (await nodes[0].publish(topic, "Hello!".toBytes())) == 2
await sleepAsync(DURATION_TIMEOUT)
# Nodes 1 and 2 should receive the message, but node 0 shouldn't receive it back
let results = await waitForStates(@[handlerFut0, handlerFut1, handlerFut2])
check:
results[0].isPending()
results[1].isCompleted()
results[2].isCompleted()
asyncTest "flood publish to all peers with score above threshold, regardless of subscription":
let
numberOfNodes = 3
topic = "foobar"
nodes = generateNodes(numberOfNodes, gossip = true, floodPublish = true)
g0 = GossipSub(nodes[0])
startNodesAndDeferStop(nodes)
# Nodes 1 and 2 are connected to node 0
await connectNodes(nodes[0], nodes[1])
await connectNodes(nodes[0], nodes[2])
let (handlerFut1, handler1) = createCompleteHandler()
let (handlerFut2, handler2) = createCompleteHandler()
# Nodes are subscribed to the same topic
nodes[1].subscribe(topic, handler1)
nodes[2].subscribe(topic, handler2)
await sleepAsync(1.seconds)
# Given node 2's score is below the threshold
for peer in g0.gossipsub.getOrDefault(topic):
if peer.peerId == nodes[2].peerInfo.peerId:
peer.score = (g0.parameters.publishThreshold - 1)
# When node 0 publishes a message to topic "foo"
let message = "Hello!".toBytes()
check (await nodes[0].publish(topic, message)) == 1
await sleepAsync(3.seconds)
# Then only node 1 should receive the message
let results = await waitForStates(@[handlerFut1, handlerFut2])
check:
results[0].isCompleted(true)
results[1].isPending()
asyncTest "adaptive gossip dissemination, dLazy and gossipFactor to 0":
let
numberOfNodes = 20
topic = "foobar"
dValues = DValues(
dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1), dLazy: some(0)
)
nodes = generateNodes(
numberOfNodes,
gossip = true,
dValues = some(dValues),
gossipFactor = some(0.float),
)
startNodesAndDeferStop(nodes)
# All nodes are checking for iHave messages
var receivedIHavesRef = new seq[int]
addIHaveObservers(nodes, topic, receivedIHavesRef)
# And are connected to node 0
for i in 1 ..< numberOfNodes:
await connectNodes(nodes[0], nodes[i])
# And subscribed to the same topic
subscribeAllNodes(nodes, topic, voidTopicHandler)
await sleepAsync(DURATION_TIMEOUT)
# When node 0 sends a message
check (await nodes[0].publish(topic, "Hello!".toBytes())) == 3
await sleepAsync(DURATION_TIMEOUT)
# None of the nodes should have received an iHave message
let receivedIHaves = receivedIHavesRef[]
check:
filterIt(receivedIHaves, it > 0).len == 0
asyncTest "adaptive gossip dissemination, with gossipFactor priority":
let
numberOfNodes = 20
topic = "foobar"
dValues = DValues(
dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1), dLazy: some(4)
)
nodes = generateNodes(
numberOfNodes, gossip = true, dValues = some(dValues), gossipFactor = some(0.5)
)
startNodesAndDeferStop(nodes)
# All nodes are checking for iHave messages
var receivedIHavesRef = new seq[int]
addIHaveObservers(nodes, topic, receivedIHavesRef)
# And are connected to node 0
for i in 1 ..< numberOfNodes:
await connectNodes(nodes[0], nodes[i])
# And subscribed to the same topic
subscribeAllNodes(nodes, topic, voidTopicHandler)
await sleepAsync(DURATION_TIMEOUT)
# When node 0 sends a message
check (await nodes[0].publish(topic, "Hello!".toBytes())) == 3
await sleepAsync(DURATION_TIMEOUT)
# At least 8 of the nodes should have received an iHave message
# That's because the gossip factor is 0.5 over 16 available nodes
let receivedIHaves = receivedIHavesRef[]
check:
filterIt(receivedIHaves, it > 0).len >= 8
asyncTest "adaptive gossip dissemination, with dLazy priority":
let
numberOfNodes = 20
topic = "foobar"
dValues = DValues(
dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1), dLazy: some(6)
)
nodes = generateNodes(
numberOfNodes,
gossip = true,
dValues = some(dValues),
gossipFactor = some(0.float),
)
startNodesAndDeferStop(nodes)
# All nodes are checking for iHave messages
var receivedIHavesRef = new seq[int]
addIHaveObservers(nodes, topic, receivedIHavesRef)
# And are connected to node 0
for i in 1 ..< numberOfNodes:
await connectNodes(nodes[0], nodes[i])
# And subscribed to the same topic
subscribeAllNodes(nodes, topic, voidTopicHandler)
await sleepAsync(DURATION_TIMEOUT)
# When node 0 sends a message
check (await nodes[0].publish(topic, "Hello!".toBytes())) == 3
await sleepAsync(DURATION_TIMEOUT)
# At least 6 of the nodes should have received an iHave message
# That's because the dLazy is 6
let receivedIHaves = receivedIHavesRef[]
check:
filterIt(receivedIHaves, it > 0).len == dValues.dLazy.get()
asyncTest "iDontWant messages are broadcast immediately after receiving the first message instance":
let
numberOfNodes = 3
topic = "foobar"
nodes = generateNodes(numberOfNodes, gossip = true)
startNodesAndDeferStop(nodes)
# All nodes are checking for iDontWant messages
var receivedIDontWantsRef = new seq[int]
addIDontWantObservers(nodes, receivedIDontWantsRef)
# And are connected in a line
await connectNodes(nodes[0], nodes[1])
await connectNodes(nodes[1], nodes[2])
# And subscribed to the same topic
subscribeAllNodes(nodes, topic, voidTopicHandler)
await sleepAsync(DURATION_TIMEOUT)
# When node 0 sends a large message
let largeMsg = newSeq[byte](1000)
check (await nodes[0].publish(topic, largeMsg)) == 1
await sleepAsync(DURATION_TIMEOUT)
# Only node 2 should have received the iDontWant message
let receivedIDontWants = receivedIDontWantsRef[]
check:
receivedIDontWants[0] == 0
receivedIDontWants[1] == 0
receivedIDontWants[2] == 1

View File

@@ -1,4 +1,5 @@
{.used.}
import
testfloodsub, testgossipsub, testgossipsub2, testmcache, testtimedcache, testmessage
testfloodsub, testgossipsub, testgossipsub2, testgossipsubparameters, testmcache,
testtimedcache, testmessage

View File

@@ -24,7 +24,15 @@ export builders
randomize()
type TestGossipSub* = ref object of GossipSub
type
TestGossipSub* = ref object of GossipSub
DValues* = object
d*: Option[int]
dLow*: Option[int]
dHigh*: Option[int]
dScore*: Option[int]
dOut*: Option[int]
dLazy*: Option[int]
proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer =
proc getConn(): Future[Connection] {.
@@ -62,6 +70,24 @@ func defaultMsgIdProvider*(m: Message): Result[MessageId, ValidationResult] =
$m.data.hash & $m.topic.hash
ok mid.toBytes()
proc applyDValues(parameters: var GossipSubParams, dValues: Option[DValues]) =
if dValues.isNone:
return
let values = dValues.get
# Apply each value if it exists
if values.d.isSome:
parameters.d = values.d.get
if values.dLow.isSome:
parameters.dLow = values.dLow.get
if values.dHigh.isSome:
parameters.dHigh = values.dHigh.get
if values.dScore.isSome:
parameters.dScore = values.dScore.get
if values.dOut.isSome:
parameters.dOut = values.dOut.get
if values.dLazy.isSome:
parameters.dLazy = values.dLazy.get
proc generateNodes*(
num: Natural,
secureManagers: openArray[SecureProtocol] = [SecureProtocol.Noise],
@@ -79,6 +105,9 @@ proc generateNodes*(
Opt.none(tuple[bytes: int, interval: Duration]),
gossipSubVersion: string = "",
sendIDontWantOnPublish: bool = false,
floodPublish: bool = false,
dValues: Option[DValues] = DValues.none(),
gossipFactor: Option[float] = float.none(),
): seq[PubSub] =
for i in 0 ..< num:
let switch = newStandardSwitch(
@@ -96,13 +125,15 @@ proc generateNodes*(
maxMessageSize = maxMessageSize,
parameters = (
var p = GossipSubParams.init()
p.floodPublish = false
p.floodPublish = floodPublish
p.historyLength = 20
p.historyGossip = 20
p.unsubscribeBackoff = unsubscribeBackoff
p.enablePX = enablePX
p.overheadRateLimit = overheadRateLimit
p.sendIDontWantOnPublish = sendIDontWantOnPublish
if gossipFactor.isSome: p.gossipFactor = gossipFactor.get
applyDValues(p, dValues)
p
),
)
@@ -127,13 +158,18 @@ proc generateNodes*(
switch.mount(pubsub)
result.add(pubsub)
proc subscribeNodes*(nodes: seq[PubSub]) {.async.} =
proc connectNodes*(dialer: PubSub, target: PubSub) {.async.} =
doAssert dialer.switch.peerInfo.peerId != target.switch.peerInfo.peerId,
"Could not connect same peer"
await dialer.switch.connect(target.peerInfo.peerId, target.peerInfo.addrs)
proc connectNodesStar*(nodes: seq[PubSub]) {.async.} =
for dialer in nodes:
for node in nodes:
if dialer.switch.peerInfo.peerId != node.switch.peerInfo.peerId:
await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
await connectNodes(dialer, node)
proc subscribeSparseNodes*(nodes: seq[PubSub], degree: int = 2) {.async.} =
proc connectNodesSparse*(nodes: seq[PubSub], degree: int = 2) {.async.} =
if nodes.len < degree:
raise
(ref CatchableError)(msg: "nodes count needs to be greater or equal to degree!")
@@ -143,18 +179,14 @@ proc subscribeSparseNodes*(nodes: seq[PubSub], degree: int = 2) {.async.} =
continue
for node in nodes:
if dialer.switch.peerInfo.peerId != node.peerInfo.peerId:
await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
if dialer.switch.peerInfo.peerId != node.switch.peerInfo.peerId:
await connectNodes(dialer, node)
proc subscribeRandom*(nodes: seq[PubSub]) {.async.} =
for dialer in nodes:
var dialed: seq[PeerId]
while dialed.len < nodes.len - 1:
let node = sample(nodes)
if node.peerInfo.peerId notin dialed:
if dialer.peerInfo.peerId != node.peerInfo.peerId:
await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
dialed.add(node.peerInfo.peerId)
proc activeWait(
interval: Duration, maximum: Moment, timeoutErrorMessage = "Timeout on activeWait"
) {.async.} =
await sleepAsync(interval)
doAssert Moment.now() < maximum, timeoutErrorMessage
proc waitSub*(sender, receiver: auto, key: string) {.async.} =
if sender == receiver:
@@ -177,10 +209,14 @@ proc waitSub*(sender, receiver: auto, key: string) {.async.} =
)
:
trace "waitSub sleeping..."
await activeWait(5.milliseconds, timeout, "waitSub timeout!")
# await
await sleepAsync(5.milliseconds)
doAssert Moment.now() < timeout, "waitSub timeout!"
proc waitSubAllNodes*(nodes: seq[auto], topic: string) {.async.} =
let numberOfNodes = nodes.len
for x in 0 ..< numberOfNodes:
for y in 0 ..< numberOfNodes:
if x != y:
await waitSub(nodes[x], nodes[y], topic)
proc waitSubGraph*(nodes: seq[PubSub], key: string) {.async.} =
let timeout = Moment.now() + 5.seconds
@@ -207,6 +243,43 @@ proc waitSubGraph*(nodes: seq[PubSub], key: string) {.async.} =
if ok == nodes.len:
return
trace "waitSubGraph sleeping..."
await activeWait(5.milliseconds, timeout, "waitSubGraph timeout!")
await sleepAsync(5.milliseconds)
doAssert Moment.now() < timeout, "waitSubGraph timeout!"
proc waitForMesh*(
sender: auto, receiver: auto, key: string, timeoutDuration = 5.seconds
) {.async.} =
if sender == receiver:
return
let
timeoutMoment = Moment.now() + timeoutDuration
gossipsubSender = GossipSub(sender)
receiverPeerId = receiver.peerInfo.peerId
while not gossipsubSender.mesh.hasPeerId(key, receiverPeerId):
trace "waitForMesh sleeping..."
await activeWait(5.milliseconds, timeoutMoment, "waitForMesh timeout!")
proc startNodes*(nodes: seq[PubSub]) {.async.} =
await allFuturesThrowing(nodes.mapIt(it.switch.start()))
proc stopNodes*(nodes: seq[PubSub]) {.async.} =
await allFuturesThrowing(nodes.mapIt(it.switch.stop()))
template startNodesAndDeferStop*(nodes: seq[PubSub]): untyped =
await startNodes(nodes)
defer:
await stopNodes(nodes)
proc subscribeAllNodes*(nodes: seq[PubSub], topic: string, topicHandler: TopicHandler) =
for node in nodes:
node.subscribe(topic, topicHandler)
proc subscribeAllNodes*(
nodes: seq[PubSub], topic: string, topicHandlers: seq[TopicHandler]
) =
if nodes.len != topicHandlers.len:
raise (ref CatchableError)(msg: "nodes and topicHandlers count needs to match!")
for i in 0 ..< nodes.len:
nodes[i].subscribe(topic, topicHandlers[i])

View File

@@ -18,7 +18,7 @@ import
discovery/discoverymngr,
discovery/rendezvousinterface,
]
import ./helpers
import ./helpers, ./utils/[futures, async_tests]
proc createSwitch(rdv: RendezVous = RendezVous.new()): Switch =
SwitchBuilder

61
tests/utils/futures.nim Normal file
View File

@@ -0,0 +1,61 @@
import chronos/futures, stew/results, chronos, sequtils
const
DURATION_TIMEOUT* = 1.seconds
DURATION_TIMEOUT_EXTENDED* = 1500.milliseconds
type FutureStateWrapper*[T] = object
future: Future[T]
state: FutureState
when T is void:
discard
else:
value: T
proc isPending*(wrapper: FutureStateWrapper): bool =
wrapper.state == Pending
proc isCompleted*(wrapper: FutureStateWrapper): bool =
wrapper.state == Completed
proc isCompleted*[T](wrapper: FutureStateWrapper[T], expectedValue: T): bool =
when T is void:
wrapper.state == Completed
else:
wrapper.state == Completed and wrapper.value == expectedValue
proc isCancelled*(wrapper: FutureStateWrapper): bool =
wrapper.state == Cancelled
proc isFailed*(wrapper: FutureStateWrapper): bool =
wrapper.state == Failed
proc toState*[T](future: Future[T]): FutureStateWrapper[T] =
var wrapper: FutureStateWrapper[T]
wrapper.future = future
if future.cancelled():
wrapper.state = Cancelled
elif future.finished():
if future.failed():
wrapper.state = Failed
else:
wrapper.state = Completed
when T isnot void:
wrapper.value = future.read()
else:
wrapper.state = Pending
return wrapper
proc waitForState*[T](
future: Future[T], timeout = DURATION_TIMEOUT
): Future[FutureStateWrapper[T]] {.async.} =
discard await future.withTimeout(timeout)
return future.toState()
proc waitForStates*[T](
futures: seq[Future[T]], timeout = DURATION_TIMEOUT
): Future[seq[FutureStateWrapper[T]]] {.async.} =
await sleepAsync(timeout)
return futures.mapIt(it.toState())