test(gossipsub): heartbeat tests (#1391)

This commit is contained in:
Radosław Kamiński
2025-05-27 10:28:12 +01:00
committed by GitHub
parent 7275f6f9c3
commit 4e6f4af601
3 changed files with 383 additions and 52 deletions

View File

@@ -73,7 +73,7 @@ suite "GossipSub Control Messages":
iwants.messageIDs.len == 1
gossipSub.mcache.msgs.len == 1
asyncTest "handleIWant - Peers with budget should request messages ":
asyncTest "handleIWant - peers with budget should request messages":
let topic = "foobar"
var (gossipSub, conns, peers) = setupGossipSubWithPeers(1, topic)
defer:

View File

@@ -0,0 +1,313 @@
{.used.}
import std/[sequtils]
import utils
import ../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable]
import ../helpers
suite "GossipSub Heartbeat":
teardown:
checkTrackers()
asyncTest "Mesh is rebalanced during heartbeat - pruning peers":
const
numberOfNodes = 10
topic = "foobar"
let
nodes = generateNodes(numberOfNodes, gossip = true).toGossipSub()
node0 = nodes[0]
startNodesAndDeferStop(nodes)
# Observe timestamps of received prune messages
var lastPrune = Moment.now()
for i in 0 ..< numberOfNodes:
let pubsubObserver = PubSubObserver(
onRecv: proc(peer: PubSubPeer, msgs: var RPCMsg) =
if msgs.control.isSome:
for msg in msgs.control.get.prune:
lastPrune = Moment.now()
)
nodes[i].addObserver(pubsubObserver)
# Nodes are connected to Node0
for i in 1 ..< numberOfNodes:
await connectNodes(node0, nodes[i])
subscribeAllNodes(nodes, topic, voidTopicHandler)
await waitForHeartbeat()
check:
node0.mesh[topic].len == numberOfNodes - 1
# When DValues of Node0 are updated to lower than defaults
const
newDLow = 2
newDHigh = 4
newDValues = some(
DValues(
dLow: some(newDLow),
dHigh: some(newDHigh),
d: some(3),
dLazy: some(3),
dScore: some(2),
dOut: some(2),
)
)
node0.parameters.applyDValues(newDValues)
# Waiting 2 hearbeats to finish pruning (2 peers first and then 3)
# Comparing timestamps of last received prune to confirm heartbeat interval
await waitForHeartbeat()
let after1stHearbeat = lastPrune
await waitForHeartbeat()
let after2ndHearbeat = lastPrune
let measuredHeartbeat = after2ndHearbeat - after1stHearbeat
let heartbeatDiff = measuredHeartbeat - TEST_GOSSIPSUB_HEARTBEAT_INTERVAL
# Then mesh of Node0 is rebalanced and peers are pruned to adapt to new values
check:
node0.mesh[topic].len >= newDLow and node0.mesh[topic].len <= newDHigh
heartbeatDiff < 2.milliseconds # 2ms margin
asyncTest "Mesh is rebalanced during heartbeat - grafting new peers":
const
numberOfNodes = 10
topic = "foobar"
dLow = 3
dHigh = 4
let
nodes = generateNodes(
numberOfNodes,
gossip = true,
dValues = some(
DValues(dLow: some(dLow), dHigh: some(dHigh), d: some(3), dOut: some(1))
),
pruneBackoff = 20.milliseconds,
)
.toGossipSub()
node0 = nodes[0]
startNodesAndDeferStop(nodes)
# Nodes are connected to Node0
for i in 1 ..< numberOfNodes:
await connectNodes(node0, nodes[i])
subscribeAllNodes(nodes, topic, voidTopicHandler)
await waitForHeartbeat()
check:
node0.mesh[topic].len >= dLow and node0.mesh[topic].len <= dHigh
# When peers of Node0 mesh are disconnected
let peersToDisconnect = node0.mesh[topic].toSeq()[1 .. ^1].mapIt(it.peerId)
findAndUnsubscribePeers(nodes, peersToDisconnect, topic, voidTopicHandler)
# Then mesh of Node0 is rebalanced and new peers are added
await waitForHeartbeat()
check:
node0.mesh[topic].len >= dLow and node0.mesh[topic].len <= dHigh
node0.mesh[topic].toSeq().allIt(it.peerId notin peersToDisconnect)
asyncTest "Mesh is rebalanced during heartbeat - opportunistic grafting":
const
numberOfNodes = 10
topic = "foobar"
let
nodes = generateNodes(
numberOfNodes,
gossip = true,
dValues = some(
DValues(
dLow: some(3),
dHigh: some(4),
d: some(3),
dOut: some(1),
dLazy: some(3),
dScore: some(2),
)
),
pruneBackoff = 20.milliseconds,
opportunisticGraftThreshold = 600,
)
.toGossipSub()
node0 = nodes[0]
startNodesAndDeferStop(nodes)
# Nodes are connected to Node0
for i in 1 ..< numberOfNodes:
await connectNodes(node0, nodes[i])
subscribeAllNodes(nodes, topic, voidTopicHandler)
await waitForHeartbeat()
# Keep track of initial mesh of Node0
let startingMesh = node0.mesh[topic].toSeq()
# When scores are assigned to Peers of Node0
var expectedGrafts: seq[PubSubPeer] = @[]
var score = 100.0
for peer in node0.gossipsub[topic]:
if peer in node0.mesh[topic]:
# Assign scores in starting Mesh
peer.score = score
score += 100.0
else:
# Assign scores higher than median to Peers not in starting Mesh and expect them to be grafted
peer.score = 800.0
expectedGrafts &= peer
# Then during heartbeat Peers with lower than median scores are pruned and max 2 Peers are grafted
await waitForHeartbeat()
let actualGrafts = node0.mesh[topic].toSeq().filterIt(it notin startingMesh)
const maxOpportunisticGraftsPerHeartbeat = 2
check:
actualGrafts.len == maxOpportunisticGraftsPerHeartbeat
actualGrafts.allIt(it in expectedGrafts)
asyncTest "Fanout maintenance during heartbeat - expired peers are dropped":
const
numberOfNodes = 10
topic = "foobar"
let nodes = generateNodes(numberOfNodes, gossip = true, fanoutTTL = 30.milliseconds)
.toGossipSub()
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
# All nodes but Node0 are subscribed to the topic
for node in nodes[1 .. ^1]:
node.subscribe(topic, voidTopicHandler)
await waitForHeartbeat()
# When Node0 sends a message to the topic
let node0 = nodes[0]
tryPublish await node0.publish(topic, newSeq[byte](10000)), 1
# Then Node0 fanout peers are populated
let maxFanoutPeers = node0.parameters.d
await waitForPeersInTable(node0, topic, maxFanoutPeers, PeerTableType.Fanout)
check:
node0.fanout.hasKey(topic) and node0.fanout[topic].len == maxFanoutPeers
# And after heartbeat (60ms) Node0 fanout peers are dropped (because fanoutTTL=30ms)
await waitForHeartbeat()
check:
not node0.fanout.hasKey(topic)
asyncTest "Fanout maintenance during heartbeat - fanout peers are replenished":
const
numberOfNodes = 10
topic = "foobar"
let
nodes = generateNodes(numberOfNodes, gossip = true).toGossipSub()
node0 = nodes[0]
startNodesAndDeferStop(nodes)
await connectNodesStar(nodes)
# All nodes but Node0 are subscribed to the topic
for node in nodes[1 .. ^1]:
node.subscribe(topic, voidTopicHandler)
await waitForHeartbeat()
# When Node0 sends a message to the topic
tryPublish await node0.publish(topic, newSeq[byte](10000)), 1
# Then Node0 fanout peers are populated
let maxFanoutPeers = node0.parameters.d
await waitForPeersInTable(node0, topic, maxFanoutPeers, PeerTableType.Fanout)
# When all peers but first one of Node0 fanout are disconnected
let peersToDisconnect = node0.fanout[topic].toSeq()[1 .. ^1].mapIt(it.peerId)
findAndUnsubscribePeers(nodes, peersToDisconnect, topic, voidTopicHandler)
await waitForPeersInTable(node0, topic, 1, PeerTableType.Fanout)
# Then Node0 fanout peers are replenished during heartbeat
await waitForHeartbeat()
check:
# expecting 10[numberOfNodes] - 1[Node0] - (6[maxFanoutPeers] - 1[first peer not disconnected]) = 4
node0.fanout[topic].len == numberOfNodes - 1 - (maxFanoutPeers - 1)
node0.fanout[topic].toSeq().allIt(it.peerId notin peersToDisconnect)
asyncTest "iDontWants history - last element is pruned during heartbeat":
const topic = "foobar"
let nodes = generateNodes(
2, gossip = true, sendIDontWantOnPublish = true, historyLength = 5
)
.toGossipSub()
startNodesAndDeferStop(nodes)
await connectNodes(nodes[0], nodes[1])
subscribeAllNodes(nodes, topic, voidTopicHandler)
await waitForHeartbeat()
# When Node0 sends 10 messages to the topic
const msgCount = 10
for i in 0 ..< msgCount:
tryPublish await nodes[0].publish(topic, newSeq[byte](1000)), 1
# Then Node1 receives 10 iDontWant messages from Node0
let peer = nodes[1].mesh[topic].toSeq()[0]
check:
peer.iDontWants[^1].len == msgCount
# When heartbeat happens
await waitForHeartbeat()
# Then last element of iDontWants history is pruned
check:
peer.iDontWants[^1].len == 0
asyncTest "sentIHaves history - last element is pruned during heartbeat":
# 3 Nodes, Node 0 <==> Node 1 and Node 0 <==> Node 2
# due to DValues: 1 peer in mesh and 1 peer only in gossip of Node 0
const
numberOfNodes = 3
topic = "foobar"
let nodes = generateNodes(
numberOfNodes,
gossip = true,
historyLength = 3,
dValues =
some(DValues(dLow: some(1), dHigh: some(1), d: some(1), dOut: some(0))),
)
.toGossipSub()
startNodesAndDeferStop(nodes)
for i in 1 ..< numberOfNodes:
await connectNodes(nodes[0], nodes[i])
subscribeAllNodes(nodes, topic, voidTopicHandler)
# Waiting 2 heartbeats to populate sentIHaves history
await waitForHeartbeat(2)
# When Node0 sends a messages to the topic
tryPublish await nodes[0].publish(topic, newSeq[byte](1000)), 1
# Find Peer outside of mesh to which Node 0 will send IHave
let peer =
nodes[0].gossipsub[topic].toSeq().filterIt(it notin nodes[0].mesh[topic])[0]
# When next heartbeat occurs
# Then IHave is sent and sentIHaves is populated
waitForCondition(
peer.sentIHaves[^1].len > 0,
10.milliseconds,
100.milliseconds,
"wait for sentIHaves timeout",
)
# Need to clear mCache as node would keep populating sentIHaves
nodes[0].clearMCache()
# When next heartbeat occurs
await waitForHeartbeat()
# Then last element of sentIHaves history is pruned
check:
peer.sentIHaves[^1].len == 0

View File

@@ -168,6 +168,8 @@ proc generateNodes*(
sign: bool = libp2p_pubsub_sign,
sendSignedPeerRecord = false,
unsubscribeBackoff = 1.seconds,
pruneBackoff = 1.minutes,
fanoutTTL = 1.minutes,
maxMessageSize: int = 1024 * 1024,
enablePX: bool = false,
overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] =
@@ -178,6 +180,8 @@ proc generateNodes*(
floodPublish: bool = false,
dValues: Option[DValues] = DValues.none(),
gossipFactor: Option[float] = float.none(),
opportunisticGraftThreshold: float = 0.0,
historyLength = 20,
): seq[PubSub] =
for i in 0 ..< num:
let switch = newStandardSwitch(
@@ -197,12 +201,15 @@ proc generateNodes*(
var p = GossipSubParams.init()
p.heartbeatInterval = heartbeatInterval
p.floodPublish = floodPublish
p.historyLength = 20
p.historyLength = historyLength
p.historyGossip = 20
p.unsubscribeBackoff = unsubscribeBackoff
p.pruneBackoff = pruneBackoff
p.fanoutTTL = fanoutTTL
p.enablePX = enablePX
p.overheadRateLimit = overheadRateLimit
p.sendIDontWantOnPublish = sendIDontWantOnPublish
p.opportunisticGraftThreshold = opportunisticGraftThreshold
if gossipFactor.isSome: p.gossipFactor = gossipFactor.get
applyDValues(p, dValues)
p
@@ -256,34 +263,35 @@ proc connectNodesSparse*[T: PubSub](nodes: seq[T], degree: int = 2) {.async.} =
if dialer.switch.peerInfo.peerId != node.switch.peerInfo.peerId:
await connectNodes(dialer, node)
proc activeWait(
interval: Duration, maximum: Moment, timeoutErrorMessage = "Timeout on activeWait"
) {.async.} =
await sleepAsync(interval)
doAssert Moment.now() < maximum, timeoutErrorMessage
template waitForCondition*(
condition: untyped,
interval: Duration,
timeout: Duration,
timeoutErrorMessage = "waitForCondition timeout",
): untyped =
let maxTime = Moment.now() + timeout
while not condition:
await sleepAsync(interval)
if Moment.now() >= maxTime:
raise (ref CatchableError)(msg: timeoutErrorMessage)
proc waitSub*(sender, receiver: auto, key: string) {.async.} =
if sender == receiver:
return
let timeout = Moment.now() + 5.seconds
let fsub = GossipSub(sender)
let peerId = receiver.peerInfo.peerId
# this is for testing purposes only
# peers can be inside `mesh` and `fanout`, not just `gossipsub`
while (
not fsub.gossipsub.hasKey(key) or
not fsub.gossipsub.hasPeerId(key, receiver.peerInfo.peerId)
) and
(
not fsub.mesh.hasKey(key) or
not fsub.mesh.hasPeerId(key, receiver.peerInfo.peerId)
) and (
not fsub.fanout.hasKey(key) or
not fsub.fanout.hasPeerId(key, receiver.peerInfo.peerId)
waitForCondition(
(fsub.gossipsub.hasKey(key) and fsub.gossipsub.hasPeerId(key, peerId)) or
(fsub.mesh.hasKey(key) and fsub.mesh.hasPeerId(key, peerId)) or
(fsub.fanout.hasKey(key) and fsub.fanout.hasPeerId(key, peerId)),
5.milliseconds,
5.seconds,
"waitSub timeout!",
)
:
trace "waitSub sleeping..."
await activeWait(5.milliseconds, timeout, "waitSub timeout!")
proc waitSubAllNodes*(nodes: seq[auto], topic: string) {.async.} =
let numberOfNodes = nodes.len
@@ -293,8 +301,7 @@ proc waitSubAllNodes*(nodes: seq[auto], topic: string) {.async.} =
await waitSub(nodes[x], nodes[y], topic)
proc waitSubGraph*[T: PubSub](nodes: seq[T], key: string) {.async.} =
let timeout = Moment.now() + 5.seconds
while true:
proc isGraphConnected(): bool =
var
nodesMesh: Table[PeerId, seq[PeerId]]
seen: HashSet[PeerId]
@@ -314,10 +321,12 @@ proc waitSubGraph*[T: PubSub](nodes: seq[T], key: string) {.async.} =
explore(n.peerInfo.peerId)
if seen.len == nodes.len:
ok.inc()
if ok == nodes.len:
return
trace "waitSubGraph sleeping..."
await activeWait(5.milliseconds, timeout, "waitSubGraph timeout!")
return ok == nodes.len
waitForCondition(
isGraphConnected(), 10.milliseconds, 5.seconds, "waitSubGraph timeout!"
)
proc waitForMesh*(
sender: auto, receiver: auto, key: string, timeoutDuration = 5.seconds
@@ -326,13 +335,15 @@ proc waitForMesh*(
return
let
timeoutMoment = Moment.now() + timeoutDuration
gossipsubSender = GossipSub(sender)
receiverPeerId = receiver.peerInfo.peerId
while not gossipsubSender.mesh.hasPeerId(key, receiverPeerId):
trace "waitForMesh sleeping..."
await activeWait(5.milliseconds, timeoutMoment, "waitForMesh timeout!")
waitForCondition(
gossipsubSender.mesh.hasPeerId(key, receiverPeerId),
5.milliseconds,
timeoutDuration,
"waitForMesh timeout!",
)
type PeerTableType* {.pure.} = enum
Gossipsub = "gossipsub"
@@ -344,21 +355,17 @@ proc waitForPeersInTable*(
topic: string,
peerCounts: seq[int],
table: PeerTableType,
timeout = 5.seconds,
timeout = 3.seconds,
) {.async.} =
## Wait until each node in `nodes` has at least the corresponding number of peers from `peerCounts`
## in the specified table (mesh, gossipsub, or fanout) for the given topic
doAssert nodes.len == peerCounts.len, "Node count must match peer count expectations"
var satisfied = newSeq[bool](nodes.len)
# Helper proc to check current state and update satisfaction status
proc checkState(
nodes: seq[auto],
topic: string,
peerCounts: seq[int],
table: PeerTableType,
satisfied: var seq[bool],
): bool =
proc checkPeersCondition(): bool =
for i in 0 ..< nodes.len:
if not satisfied[i]:
let fsub = GossipSub(nodes[i])
@@ -373,20 +380,17 @@ proc waitForPeersInTable*(
satisfied[i] = currentCount >= peerCounts[i]
return satisfied.allIt(it)
let timeoutMoment = Moment.now() + timeout
var
satisfied = newSeq[bool](nodes.len)
allSatisfied = false
waitForCondition(
checkPeersCondition(),
10.milliseconds,
timeout,
"Timeout waiting for peer counts in " & $table & " for topic " & topic,
)
allSatisfied = checkState(nodes, topic, peerCounts, table, satisfied) # Initial check
# Continue checking until all requirements are met or timeout
while not allSatisfied:
await activeWait(
5.milliseconds,
timeoutMoment,
"Timeout waiting for peer counts in " & $table & " for topic " & topic,
)
allSatisfied = checkState(nodes, topic, peerCounts, table, satisfied)
proc waitForPeersInTable*(
node: auto, topic: string, peerCount: int, table: PeerTableType, timeout = 1.seconds
) {.async.} =
await waitForPeersInTable(@[node], topic, @[peerCount], table, timeout)
proc startNodes*[T: PubSub](nodes: seq[T]) {.async.} =
await allFuturesThrowing(nodes.mapIt(it.switch.start()))
@@ -512,6 +516,20 @@ proc addIDontWantObservers*[T: PubSub](
return allMessages
proc findAndUnsubscribePeers*[T: PubSub](
nodes: seq[T], peers: seq[PeerId], topic: string, handler: TopicHandler
) =
for i in 0 ..< nodes.len:
let node = nodes[i]
if peers.anyIt(it == node.peerInfo.peerId):
node.unsubscribe(topic, voidTopicHandler)
proc clearMCache*[T: PubSub](node: T) =
node.mcache.msgs.clear()
for i in 0 ..< node.mcache.history.len:
node.mcache.history[i].setLen(0)
node.mcache.pos = 0
# TODO: refactor helper methods from testgossipsub.nim
proc setupNodes*(count: int): seq[PubSub] =
generateNodes(count, gossip = true)