test(gossipsub): split unit and integration tests (#1465)

This commit is contained in:
Radosław Kamiński
2025-06-16 16:18:18 +01:00
committed by GitHub
parent 2eafac47e8
commit d803352bd6
15 changed files with 804 additions and 746 deletions

View File

@@ -12,8 +12,8 @@
import sequtils, tables, sets
import chronos, stew/byteutils
import
utils,
../../libp2p/[
../utils,
../../../libp2p/[
switch,
stream/connection,
crypto/crypto,
@@ -23,9 +23,9 @@ import
protocols/pubsub/peertable,
protocols/pubsub/pubsubpeer,
]
import ../../libp2p/protocols/pubsub/errors as pubsub_errors
import ../../../libp2p/protocols/pubsub/errors as pubsub_errors
import ../helpers
import ../../helpers
proc waitSub(sender, receiver: auto, key: string) {.async.} =
# turn things deterministic
@@ -38,7 +38,7 @@ proc waitSub(sender, receiver: auto, key: string) {.async.} =
dec ceil
doAssert(ceil > 0, "waitSub timeout!")
suite "FloodSub":
suite "FloodSub Integration":
teardown:
checkTrackers()

View File

@@ -1,106 +1,15 @@
{.used.}
import std/[sequtils]
import stew/byteutils
import utils
import chronicles
import ../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable]
import ../helpers
import ../utils
import ../../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable]
import ../../helpers
suite "GossipSub Control Messages":
suite "GossipSub Integration - Control Messages":
teardown:
checkTrackers()
asyncTest "handleIHave - peers with no budget should not request messages":
let topic = "foobar"
var (gossipSub, conns, peers) = setupGossipSubWithPeers(1, topic)
defer:
await teardownGossipSub(gossipSub, conns)
gossipSub.subscribe(topic, voidTopicHandler)
let peerId = randomPeerId()
let peer = gossipSub.getPubSubPeer(peerId)
# Add message to `gossipSub`'s message cache
let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message())
peer.sentIHaves[^1].incl(id)
# Build an IHAVE message that contains the same message ID three times
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
# Given the peer has no budget to request messages
peer.iHaveBudget = 0
# When a peer makes an IHAVE request for the a message that `gossipSub` has
let iwants = gossipSub.handleIHave(peer, @[msg])
# Then `gossipSub` should not generate an IWant message for the message,
check:
iwants.messageIDs.len == 0
gossipSub.mcache.msgs.len == 1
asyncTest "handleIHave - peers with budget should request messages":
let topic = "foobar"
var (gossipSub, conns, peers) = setupGossipSubWithPeers(1, topic)
defer:
await teardownGossipSub(gossipSub, conns)
gossipSub.subscribe(topic, voidTopicHandler)
let peerId = randomPeerId()
let peer = gossipSub.getPubSubPeer(peerId)
# Add message to `gossipSub`'s message cache
let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message())
peer.sentIHaves[^1].incl(id)
# Build an IHAVE message that contains the same message ID three times
# If ids are repeated, only one request should be generated
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
# Given the budget is not 0 (because it's not been overridden)
check:
peer.iHaveBudget > 0
# When a peer makes an IHAVE request for the a message that `gossipSub` does not have
let iwants = gossipSub.handleIHave(peer, @[msg])
# Then `gossipSub` should generate an IWant message for the message
check:
iwants.messageIDs.len == 1
gossipSub.mcache.msgs.len == 1
asyncTest "handleIWant - peers with budget should request messages":
let topic = "foobar"
var (gossipSub, conns, peers) = setupGossipSubWithPeers(1, topic)
defer:
await teardownGossipSub(gossipSub, conns)
gossipSub.subscribe(topic, voidTopicHandler)
let peerId = randomPeerId()
let peer = gossipSub.getPubSubPeer(peerId)
# Add message to `gossipSub`'s message cache
let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message())
peer.sentIHaves[^1].incl(id)
# Build an IWANT message that contains the same message ID three times
# If ids are repeated, only one request should be generated
let msg = ControlIWant(messageIDs: @[id, id, id])
# When a peer makes an IWANT request for the a message that `gossipSub` has
let messages = gossipSub.handleIWant(peer, @[msg])
# Then `gossipSub` should return the message
check:
messages.len == 1
gossipSub.mcache.msgs.len == 1
asyncTest "GRAFT messages correctly add peers to mesh":
# Given 2 nodes
let
@@ -512,32 +421,3 @@ suite "GossipSub Control Messages":
check:
toSeq(nodeC.mesh.getOrDefault(topic)).allIt(it.iDontWants.allIt(it.len == 0))
toSeq(nodeA.mesh.getOrDefault(topic)).allIt(it.iDontWants.allIt(it.len == 0))
asyncTest "Max IDONTWANT messages per heartbeat per peer":
# Given GossipSub node with 1 peer
let
topic = "foobar"
totalPeers = 1
let (gossipSub, conns, peers) = setupGossipSubWithPeers(totalPeers, topic)
defer:
await teardownGossipSub(gossipSub, conns)
let peer = peers[0]
# And sequence of iDontWants with more messages than max number (1200)
proc generateMessageIds(count: int): seq[MessageId] =
return (0 ..< count).mapIt(("msg_id_" & $it & $Moment.now()).toBytes())
let iDontWants =
@[
ControlIWant(messageIDs: generateMessageIds(600)),
ControlIWant(messageIDs: generateMessageIds(600)),
]
# When node handles iDontWants
gossipSub.handleIDontWant(peer, iDontWants)
# Then it saves max IDontWantMaxCount messages in the history and the rest is dropped
check:
peer.iDontWants[0].len == IDontWantMaxCount

View File

@@ -11,11 +11,11 @@
import chronos
import stew/byteutils
import utils
import ../../libp2p/protocols/pubsub/[gossipsub, peertable, pubsubpeer]
import ../../libp2p/protocols/pubsub/rpc/[messages]
import ../../libp2p/stream/connection
import ../helpers
import ../utils
import ../../../libp2p/protocols/pubsub/[gossipsub, peertable, pubsubpeer]
import ../../../libp2p/protocols/pubsub/rpc/[messages]
import ../../../libp2p/stream/connection
import ../../helpers
type DummyConnection* = ref object of Connection
@@ -30,7 +30,7 @@ proc new*(T: typedesc[DummyConnection]): DummyConnection =
let instance = T()
instance
suite "GossipSub Custom Connection Support":
suite "GossipSub Integration - Custom Connection Support":
teardown:
checkTrackers()

View File

@@ -9,66 +9,18 @@
{.used.}
import std/[sequtils]
import stew/byteutils
import chronicles
import utils
import ../../libp2p/protocols/pubsub/[gossipsub, peertable]
import ../../libp2p/protocols/pubsub/rpc/[messages]
import ../helpers
import ../utils
import ../../../libp2p/protocols/pubsub/[gossipsub, peertable]
import ../../../libp2p/protocols/pubsub/rpc/[messages]
import ../../helpers
suite "GossipSub Fanout Management":
suite "GossipSub Integration - Fanout Management":
teardown:
checkTrackers()
asyncTest "`replenishFanout` Degree Lo":
let topic = "foobar"
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
defer:
await teardownGossipSub(gossipSub, conns)
check gossipSub.gossipsub[topic].len == 15
gossipSub.replenishFanout(topic)
check gossipSub.fanout[topic].len == gossipSub.parameters.d
asyncTest "`dropFanoutPeers` drop expired fanout topics":
let topic = "foobar"
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(6, topic, populateGossipsub = true, populateFanout = true)
defer:
await teardownGossipSub(gossipSub, conns)
gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(1.millis)
await sleepAsync(5.millis) # allow the topic to expire
check gossipSub.fanout[topic].len == gossipSub.parameters.d
gossipSub.dropFanoutPeers()
check topic notin gossipSub.fanout
asyncTest "`dropFanoutPeers` leave unexpired fanout topics":
let
topic1 = "foobar1"
topic2 = "foobar2"
let (gossipSub, conns, peers) = setupGossipSubWithPeers(
6, @[topic1, topic2], populateGossipsub = true, populateFanout = true
)
defer:
await teardownGossipSub(gossipSub, conns)
gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(1.millis)
gossipSub.lastFanoutPubSub[topic2] = Moment.fromNow(1.minutes)
await sleepAsync(5.millis) # allow first topic to expire
check gossipSub.fanout[topic1].len == gossipSub.parameters.d
check gossipSub.fanout[topic2].len == gossipSub.parameters.d
gossipSub.dropFanoutPeers()
check topic1 notin gossipSub.fanout
check topic2 in gossipSub.fanout
asyncTest "e2e - GossipSub send over fanout A -> B":
asyncTest "GossipSub send over fanout A -> B":
let (passed, handler) = createCompleteHandler()
let nodes = generateNodes(2, gossip = true)
@@ -107,7 +59,7 @@ suite "GossipSub Fanout Management":
check observed == 2
asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic":
asyncTest "GossipSub send over fanout A -> B for subscribed topic":
let (passed, handler) = createCompleteHandler()
let nodes = generateNodes(2, gossip = true, unsubscribeBackoff = 10.minutes)

View File

@@ -12,129 +12,15 @@
import std/[sequtils]
import stew/byteutils
import chronicles
import utils
import ../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable]
import ../../libp2p/protocols/pubsub/rpc/[message]
import ../helpers, ../utils/[futures]
import ../utils
import ../../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable]
import ../../../libp2p/protocols/pubsub/rpc/[message]
import ../../helpers, ../../utils/[futures]
const MsgIdSuccess = "msg id gen success"
suite "GossipSub Gossip Protocol":
suite "GossipSub Integration - Gossip Protocol":
teardown:
checkTrackers()
asyncTest "`getGossipPeers` - should gather up to degree D non intersecting peers":
let topic = "foobar"
let (gossipSub, conns, peers) = setupGossipSubWithPeers(45, topic)
defer:
await teardownGossipSub(gossipSub, conns)
# generate mesh and fanout peers
for i in 0 ..< 30:
let peer = peers[i]
if i mod 2 == 0:
gossipSub.fanout[topic].incl(peer)
else:
gossipSub.grafted(peer, topic)
gossipSub.mesh[topic].incl(peer)
# generate gossipsub (free standing) peers
for i in 30 ..< 45:
let peer = peers[i]
gossipSub.gossipsub[topic].incl(peer)
# generate messages
var seqno = 0'u64
for i in 0 .. 5:
let conn = conns[i]
inc seqno
let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
check gossipSub.fanout[topic].len == 15
check gossipSub.mesh[topic].len == 15
check gossipSub.gossipsub[topic].len == 15
let gossipPeers = gossipSub.getGossipPeers()
check gossipPeers.len == gossipSub.parameters.d
for p in gossipPeers.keys:
check not gossipSub.fanout.hasPeerId(topic, p.peerId)
check not gossipSub.mesh.hasPeerId(topic, p.peerId)
asyncTest "`getGossipPeers` - should not crash on missing topics in mesh":
let topic = "foobar"
let (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic)
defer:
await teardownGossipSub(gossipSub, conns)
# generate mesh and fanout peers
for i, peer in peers:
if i mod 2 == 0:
gossipSub.fanout[topic].incl(peer)
else:
gossipSub.gossipsub[topic].incl(peer)
# generate messages
var seqno = 0'u64
for i in 0 .. 5:
let conn = conns[i]
inc seqno
let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
let gossipPeers = gossipSub.getGossipPeers()
check gossipPeers.len == gossipSub.parameters.d
asyncTest "`getGossipPeers` - should not crash on missing topics in fanout":
let topic = "foobar"
let (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic)
defer:
await teardownGossipSub(gossipSub, conns)
# generate mesh and fanout peers
for i, peer in peers:
if i mod 2 == 0:
gossipSub.mesh[topic].incl(peer)
gossipSub.grafted(peer, topic)
else:
gossipSub.gossipsub[topic].incl(peer)
# generate messages
var seqno = 0'u64
for i in 0 .. 5:
let conn = conns[i]
inc seqno
let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
let gossipPeers = gossipSub.getGossipPeers()
check gossipPeers.len == gossipSub.parameters.d
asyncTest "`getGossipPeers` - should not crash on missing topics in gossip":
let topic = "foobar"
let (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic)
defer:
await teardownGossipSub(gossipSub, conns)
# generate mesh and fanout peers
for i, peer in peers:
if i mod 2 == 0:
gossipSub.mesh[topic].incl(peer)
gossipSub.grafted(peer, topic)
else:
gossipSub.fanout[topic].incl(peer)
# generate messages
var seqno = 0'u64
for i in 0 .. 5:
let conn = conns[i]
inc seqno
let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
let gossipPeers = gossipSub.getGossipPeers()
check gossipPeers.len == 0
asyncTest "messages sent to peers not in the mesh are propagated via gossip":
let
numberOfNodes = 5
@@ -314,7 +200,7 @@ suite "GossipSub Gossip Protocol":
messages[].mapIt(it[].len)[1] == 0
messages[].mapIt(it[].len)[0] == 0
asyncTest "e2e - GossipSub peer exchange":
asyncTest "GossipSub peer exchange":
# A, B & C are subscribed to something
# B unsubcribe from it, it should send
# PX to A & C

View File

@@ -1,11 +1,11 @@
{.used.}
import std/[sequtils]
import utils
import ../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable]
import ../helpers
import ../utils
import ../../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable]
import ../../helpers
suite "GossipSub Heartbeat":
suite "GossipSub Integration - Heartbeat":
teardown:
checkTrackers()

View File

@@ -11,194 +11,14 @@
import chronicles
import std/[sequtils]
import utils
import ../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable]
import ../helpers
import ../utils
import ../../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable]
import ../../helpers
suite "GossipSub Mesh Management":
suite "GossipSub Integration - Mesh Management":
teardown:
checkTrackers()
asyncTest "subscribe/unsubscribeAll":
let topic = "foobar"
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true, populateMesh = true)
defer:
await teardownGossipSub(gossipSub, conns)
# test via dynamic dispatch
gossipSub.PubSub.subscribe(topic, voidTopicHandler)
check:
gossipSub.topics.contains(topic)
gossipSub.gossipsub[topic].len() > 0
gossipSub.mesh[topic].len() > 0
# test via dynamic dispatch
gossipSub.PubSub.unsubscribeAll(topic)
check:
topic notin gossipSub.topics # not in local topics
topic notin gossipSub.mesh # not in mesh
topic in gossipSub.gossipsub # but still in gossipsub table (for fanning out)
asyncTest "`rebalanceMesh` Degree Lo":
let topic = "foobar"
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
defer:
await teardownGossipSub(gossipSub, conns)
check gossipSub.peers.len == 15
gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len == gossipSub.parameters.d
asyncTest "rebalanceMesh - bad peers":
let topic = "foobar"
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
defer:
await teardownGossipSub(gossipSub, conns)
var scoreLow = -11'f64
for peer in peers:
peer.score = scoreLow
scoreLow += 1.0
check gossipSub.peers.len == 15
gossipSub.rebalanceMesh(topic)
# low score peers should not be in mesh, that's why the count must be 4
check gossipSub.mesh[topic].len == 4
for peer in gossipSub.mesh[topic]:
check peer.score >= 0.0
asyncTest "`rebalanceMesh` Degree Hi":
let topic = "foobar"
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true, populateMesh = true)
defer:
await teardownGossipSub(gossipSub, conns)
check gossipSub.mesh[topic].len == 15
gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len ==
gossipSub.parameters.d + gossipSub.parameters.dScore
asyncTest "rebalanceMesh fail due to backoff":
let topic = "foobar"
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
defer:
await teardownGossipSub(gossipSub, conns)
for peer in peers:
gossipSub.backingOff.mgetOrPut(topic, initTable[PeerId, Moment]()).add(
peer.peerId, Moment.now() + 1.hours
)
let prunes = gossipSub.handleGraft(peer, @[ControlGraft(topicID: topic)])
# there must be a control prune due to violation of backoff
check prunes.len != 0
check gossipSub.peers.len == 15
gossipSub.rebalanceMesh(topic)
# expect 0 since they are all backing off
check gossipSub.mesh[topic].len == 0
asyncTest "rebalanceMesh fail due to backoff - remote":
let topic = "foobar"
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true, populateMesh = true)
defer:
await teardownGossipSub(gossipSub, conns)
check gossipSub.peers.len == 15
gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len != 0
for peer in peers:
gossipSub.handlePrune(
peer,
@[
ControlPrune(
topicID: topic,
peers: @[],
backoff: gossipSub.parameters.pruneBackoff.seconds.uint64,
)
],
)
# expect topic cleaned up since they are all pruned
check topic notin gossipSub.mesh
asyncTest "rebalanceMesh Degree Hi - audit scenario":
let
topic = "foobar"
numInPeers = 6
numOutPeers = 7
totalPeers = numInPeers + numOutPeers
let (gossipSub, conns, peers) = setupGossipSubWithPeers(
totalPeers, topic, populateGossipsub = true, populateMesh = true
)
defer:
await teardownGossipSub(gossipSub, conns)
gossipSub.parameters.dScore = 4
gossipSub.parameters.d = 6
gossipSub.parameters.dOut = 3
gossipSub.parameters.dHigh = 12
gossipSub.parameters.dLow = 4
for i in 0 ..< numInPeers:
let conn = conns[i]
let peer = peers[i]
conn.transportDir = Direction.In
peer.score = 40.0
for i in numInPeers ..< totalPeers:
let conn = conns[i]
let peer = peers[i]
conn.transportDir = Direction.Out
peer.score = 10.0
check gossipSub.mesh[topic].len == 13
gossipSub.rebalanceMesh(topic)
# ensure we are above dlow
check gossipSub.mesh[topic].len > gossipSub.parameters.dLow
var outbound = 0
for peer in gossipSub.mesh[topic]:
if peer.sendConn.transportDir == Direction.Out:
inc outbound
# ensure we give priority and keep at least dOut outbound peers
check outbound >= gossipSub.parameters.dOut
asyncTest "rebalanceMesh Degree Hi - dScore controls number of peers to retain by score when pruning":
# Given GossipSub node starting with 13 peers in mesh
let
topic = "foobar"
totalPeers = 13
let (gossipSub, conns, peers) = setupGossipSubWithPeers(
totalPeers, topic, populateGossipsub = true, populateMesh = true
)
defer:
await teardownGossipSub(gossipSub, conns)
# And mesh is larger than dHigh
gossipSub.parameters.dLow = 4
gossipSub.parameters.d = 6
gossipSub.parameters.dHigh = 8
gossipSub.parameters.dOut = 3
gossipSub.parameters.dScore = 13
check gossipSub.mesh[topic].len == totalPeers
# When mesh is rebalanced
gossipSub.rebalanceMesh(topic)
# Then prunning is not triggered when mesh is not larger than dScore
check gossipSub.mesh[topic].len == totalPeers
asyncTest "Nodes graft peers according to DValues - numberOfNodes < dHigh":
let
numberOfNodes = 5
@@ -242,7 +62,7 @@ suite "GossipSub Mesh Management":
node.mesh.getOrDefault(topic).len <= dHigh
node.fanout.len == 0
asyncTest "e2e - GossipSub should add remote peer topic subscriptions":
asyncTest "GossipSub should add remote peer topic subscriptions":
proc handler(topic: string, data: seq[byte]) {.async.} =
discard
@@ -261,7 +81,7 @@ suite "GossipSub Mesh Management":
"foobar" in gossip1.gossipsub
gossip1.gossipsub.hasPeerId("foobar", gossip2.peerInfo.peerId)
asyncTest "e2e - GossipSub should add remote peer topic subscriptions if both peers are subscribed":
asyncTest "GossipSub should add remote peer topic subscriptions if both peers are subscribed":
proc handler(topic: string, data: seq[byte]) {.async.} =
discard

View File

@@ -2,12 +2,12 @@
import std/[sequtils]
import stew/byteutils
import utils
import ../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable, floodsub]
import ../../libp2p/protocols/pubsub/rpc/[messages, message]
import ../helpers
import ../utils
import ../../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable, floodsub]
import ../../../libp2p/protocols/pubsub/rpc/[messages, message]
import ../../helpers
suite "GossipSub Message Cache":
suite "GossipSub Integration - Message Cache":
teardown:
checkTrackers()

View File

@@ -11,12 +11,12 @@
import std/[sequtils, enumerate]
import stew/byteutils
import utils
import sugar
import chronicles
import ../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable, timedcache]
import ../../libp2p/protocols/pubsub/rpc/[message, protobuf]
import ../helpers, ../utils/[futures]
import ../utils
import ../../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable, timedcache]
import ../../../libp2p/protocols/pubsub/rpc/[message]
import ../../helpers, ../../utils/[futures]
const MsgIdSuccess = "msg id gen success"
@@ -72,62 +72,11 @@ proc createMessages(
return (iwantMessageIds, sentMessages)
suite "GossipSub Message Handling":
suite "GossipSub Integration - Message Handling":
teardown:
checkTrackers()
asyncTest "Drop messages of topics without subscription":
let topic = "foobar"
var (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic)
defer:
await teardownGossipSub(gossipSub, conns)
# generate messages
var seqno = 0'u64
for i in 0 .. 5:
let conn = conns[i]
let peer = peers[i]
inc seqno
let msg = Message.init(conn.peerId, ("bar" & $i).toBytes(), topic, some(seqno))
await gossipSub.rpcHandler(peer, encodeRpcMsg(RPCMsg(messages: @[msg]), false))
check gossipSub.mcache.msgs.len == 0
asyncTest "subscription limits":
let gossipSub = TestGossipSub.init(newStandardSwitch())
gossipSub.topicsHigh = 10
var tooManyTopics: seq[string]
for i in 0 .. gossipSub.topicsHigh + 10:
tooManyTopics &= "topic" & $i
let lotOfSubs = RPCMsg.withSubs(tooManyTopics, true)
let conn = TestBufferStream.new(noop)
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
await gossipSub.rpcHandler(peer, encodeRpcMsg(lotOfSubs, false))
check:
gossipSub.gossipsub.len == gossipSub.topicsHigh
peer.behaviourPenalty > 0.0
await conn.close()
await gossipSub.switch.stop()
asyncTest "invalid message bytes":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let peerId = randomPeerId()
let peer = gossipSub.getPubSubPeer(peerId)
expect(CatchableError):
await gossipSub.rpcHandler(peer, @[byte 1, 2, 3])
await gossipSub.switch.stop()
asyncTest "e2e - Split IWANT replies when individual messages are below maxSize but combined exceed maxSize":
asyncTest "Split IWANT replies when individual messages are below maxSize but combined exceed maxSize":
# This test checks if two messages, each below the maxSize, are correctly split when their combined size exceeds maxSize.
# Expected: Both messages should be received.
let (gossip0, gossip1, receivedMessages) = await setupTest()
@@ -154,7 +103,7 @@ suite "GossipSub Message Handling":
await teardownTest(gossip0, gossip1)
asyncTest "e2e - Discard IWANT replies when both messages individually exceed maxSize":
asyncTest "Discard IWANT replies when both messages individually exceed maxSize":
# This test checks if two messages, each exceeding the maxSize, are discarded and not sent.
# Expected: No messages should be received.
let (gossip0, gossip1, receivedMessages) = await setupTest()
@@ -181,7 +130,7 @@ suite "GossipSub Message Handling":
await teardownTest(gossip0, gossip1)
asyncTest "e2e - Process IWANT replies when both messages are below maxSize":
asyncTest "Process IWANT replies when both messages are below maxSize":
# This test checks if two messages, both below the maxSize, are correctly processed and sent.
# Expected: Both messages should be received.
let (gossip0, gossip1, receivedMessages) = await setupTest()
@@ -208,7 +157,7 @@ suite "GossipSub Message Handling":
await teardownTest(gossip0, gossip1)
asyncTest "e2e - Split IWANT replies when one message is below maxSize and the other exceeds maxSize":
asyncTest "Split IWANT replies when one message is below maxSize and the other exceeds maxSize":
# This test checks if, when given two messages where one is below maxSize and the other exceeds it, only the smaller message is processed and sent.
# Expected: Only the smaller message should be received.
let (gossip0, gossip1, receivedMessages) = await setupTest()
@@ -469,7 +418,7 @@ suite "GossipSub Message Handling":
validatedCounter == 1
sendCounter == 2
asyncTest "e2e - GossipSub send over mesh A -> B":
asyncTest "GossipSub send over mesh A -> B":
var passed: Future[bool] = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async.} =
check topic == "foobar"
@@ -499,7 +448,7 @@ suite "GossipSub Message Handling":
gossip2.mesh.hasPeerId("foobar", gossip1.peerInfo.peerId)
not gossip2.fanout.hasPeerId("foobar", gossip1.peerInfo.peerId)
asyncTest "e2e - GossipSub should not send to source & peers who already seen":
asyncTest "GossipSub should not send to source & peers who already seen":
# 3 nodes: A, B, C
# A publishes, C relays, B is having a long validation
# so B should not send to anyone
@@ -565,7 +514,7 @@ suite "GossipSub Message Handling":
await bFinished
asyncTest "e2e - GossipSub send over floodPublish A -> B":
asyncTest "GossipSub send over floodPublish A -> B":
var passed: Future[bool] = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async.} =
check topic == "foobar"
@@ -595,7 +544,7 @@ suite "GossipSub Message Handling":
"foobar" notin gossip2.gossipsub
not gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId)
asyncTest "e2e - GossipSub floodPublish limit":
asyncTest "GossipSub floodPublish limit":
let
nodes = setupNodes(20)
gossip1 = GossipSub(nodes[0])
@@ -607,7 +556,7 @@ suite "GossipSub Message Handling":
await connectNodes(nodes[1 ..^ 1], nodes[0])
await baseTestProcedure(nodes, gossip1, gossip1.parameters.dLow, 17)
asyncTest "e2e - GossipSub floodPublish limit with bandwidthEstimatebps = 0":
asyncTest "GossipSub floodPublish limit with bandwidthEstimatebps = 0":
let
nodes = setupNodes(20)
gossip1 = GossipSub(nodes[0])
@@ -620,7 +569,7 @@ suite "GossipSub Message Handling":
await connectNodes(nodes[1 ..^ 1], nodes[0])
await baseTestProcedure(nodes, gossip1, nodes.len - 1, nodes.len - 1)
asyncTest "e2e - GossipSub with multiple peers":
asyncTest "GossipSub with multiple peers":
var runs = 10
let nodes = generateNodes(runs, gossip = true, triggerSelf = true)
@@ -662,7 +611,7 @@ suite "GossipSub Message Handling":
check:
"foobar" in gossip.gossipsub
asyncTest "e2e - GossipSub with multiple peers (sparse)":
asyncTest "GossipSub with multiple peers (sparse)":
var runs = 10
let nodes = generateNodes(runs, gossip = true, triggerSelf = true)
@@ -711,7 +660,7 @@ suite "GossipSub Message Handling":
gossip.fanout.len == 0
gossip.mesh["foobar"].len > 0
asyncTest "e2e - GossipSub with multiple peers - control deliver (sparse)":
asyncTest "GossipSub with multiple peers - control deliver (sparse)":
var runs = 10
let nodes = generateNodes(runs, gossip = true, triggerSelf = true)

View File

@@ -11,42 +11,16 @@
import std/[sequtils]
import stew/byteutils
import utils
import ../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable, pubsubpeer]
import ../../libp2p/protocols/pubsub/rpc/[messages]
import ../../libp2p/muxers/muxer
import ../helpers
import ../utils/[futures]
import ../utils
import ../../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable, pubsubpeer]
import ../../../libp2p/protocols/pubsub/rpc/[messages]
import ../../helpers
import ../../utils/[futures]
suite "GossipSub Scoring":
suite "GossipSub Integration - Scoring":
teardown:
checkTrackers()
asyncTest "Disconnect bad peers":
let topic = "foobar"
var (gossipSub, conns, peers) =
setupGossipSubWithPeers(30, topic, populateGossipsub = true)
defer:
await teardownGossipSub(gossipSub, conns)
gossipSub.parameters.disconnectBadPeers = true
gossipSub.parameters.appSpecificWeight = 1.0
for i, peer in peers:
peer.appScore = gossipSub.parameters.graylistThreshold - 1
let conn = conns[i]
gossipSub.switch.connManager.storeMuxer(Muxer(connection: conn))
gossipSub.updateScores()
await sleepAsync(100.millis)
check:
# test our disconnect mechanics
gossipSub.gossipsub.peers(topic) == 0
# also ensure we cleanup properly the peersInIP table
gossipSub.peersInIP.len == 0
asyncTest "Flood publish to all peers with score above threshold, regardless of subscription":
let
numberOfNodes = 3
@@ -405,140 +379,3 @@ suite "GossipSub Scoring":
check:
nodes[0].peerStats[nodes[1].peerInfo.peerId].topicInfos[topic].meshMessageDeliveries in
50.0 .. 66.0
asyncTest "GossipThreshold - do not handle IHave if peer score is below threshold":
const
topic = "foobar"
gossipThreshold = -100.0
let
(gossipSub, conns, peers) = setupGossipSubWithPeers(1, topic)
peer = peers[0]
defer:
await teardownGossipSub(gossipSub, conns)
# Given peer with score below GossipThreshold
gossipSub.parameters.gossipThreshold = gossipThreshold
peer.score = gossipThreshold - 100.0
# and IHave message
let id = @[0'u8, 1, 2, 3]
let msg = ControlIHave(topicID: topic, messageIDs: @[id])
# When IHave is handled
let iWant = gossipSub.handleIHave(peer, @[msg])
# Then IHave is ignored
check:
iWant.messageIDs.len == 0
asyncTest "GossipThreshold - do not handle IWant if peer score is below threshold":
const
topic = "foobar"
gossipThreshold = -100.0
let
(gossipSub, conns, peers) = setupGossipSubWithPeers(1, topic)
peer = peers[0]
defer:
await teardownGossipSub(gossipSub, conns)
# Given peer with score below GossipThreshold
gossipSub.parameters.gossipThreshold = gossipThreshold
peer.score = gossipThreshold - 100.0
# and IWant message with MsgId in mcache and sentIHaves
let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message())
peer.sentIHaves[0].incl(id)
let msg = ControlIWant(messageIDs: @[id])
# When IWant is handled
let messages = gossipSub.handleIWant(peer, @[msg])
# Then IWant is ignored
check:
messages.len == 0
asyncTest "GossipThreshold - do not trigger PeerExchange on Prune":
const
topic = "foobar"
gossipThreshold = -100.0
let
(gossipSub, conns, peers) = setupGossipSubWithPeers(1, topic)
peer = peers[0]
defer:
await teardownGossipSub(gossipSub, conns)
# Given peer with score below GossipThreshold
gossipSub.parameters.gossipThreshold = gossipThreshold
peer.score = gossipThreshold - 100.0
# and RoutingRecordsHandler added
var routingRecordsFut = newFuture[void]()
gossipSub.routingRecordsHandler.add(
proc(peer: PeerId, tag: string, peers: seq[RoutingRecordsPair]) =
routingRecordsFut.complete()
)
# and Prune message
let msg = ControlPrune(
topicID: topic, peers: @[PeerInfoMsg(peerId: peer.peerId)], backoff: 123'u64
)
# When Prune is handled
gossipSub.handlePrune(peer, @[msg])
# Then handler is not triggered
let result = await waitForState(routingRecordsFut, HEARTBEAT_TIMEOUT)
check:
result.isCancelled()
asyncTest "GossipThreshold - do not select peer for IHave broadcast if peer score is below threshold":
const
topic = "foobar"
gossipThreshold = -100.0
let
(gossipSub, conns, peers) =
setupGossipSubWithPeers(1, topic, populateGossipsub = true)
peer = peers[0]
defer:
await teardownGossipSub(gossipSub, conns)
# Given peer with score below GossipThreshold
gossipSub.parameters.gossipThreshold = gossipThreshold
peer.score = gossipThreshold - 100.0
# and message in cache
let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message(topic: topic))
# When Node selects peers for IHave broadcast
let gossipPeers = gossipSub.getGossipPeers()
# Then peer is not selected
check:
gossipPeers.len == 0
asyncTest "PublishThreshold - do not graft when peer score below threshold":
const
topic = "foobar"
publishThreshold = -100.0
let
(gossipSub, conns, peers) = setupGossipSubWithPeers(1, topic)
peer = peers[0]
defer:
await teardownGossipSub(gossipSub, conns)
# Given peer with score below publishThreshold
gossipSub.parameters.publishThreshold = publishThreshold
peer.score = publishThreshold - 100.0
# and Graft message
let msg = ControlGraft(topicID: topic)
# When Graft is handled
let prunes = gossipSub.handleGraft(peer, @[msg])
# Then peer is ignored and not added to prunes
check:
gossipSub.mesh[topic].len == 0
prunes.len == 0

View File

@@ -0,0 +1,7 @@
{.used.}
import
testfloodsub, testgossipsubcontrolmessages, testgossipsubcustomconn,
testgossipsubfanout, testgossipsubgossip, testgossipsubheartbeat,
testgossipsubmeshmanagement, testgossipsubmessagecache, testgossipsubmessagehandling,
testgossipsubscoring

View File

@@ -0,0 +1,588 @@
{.used.}
import std/[sequtils]
import stew/byteutils
import utils
import chronicles
import ../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable]
import ../../libp2p/protocols/pubsub/rpc/[message]
import ../helpers
import ../utils/[futures]
const MsgIdSuccess = "msg id gen success"
suite "GossipSub Behavior":
teardown:
checkTrackers()
asyncTest "handleIHave - peers with no budget should not request messages":
let topic = "foobar"
var (gossipSub, conns, peers) = setupGossipSubWithPeers(1, topic)
defer:
await teardownGossipSub(gossipSub, conns)
gossipSub.subscribe(topic, voidTopicHandler)
let peerId = randomPeerId()
let peer = gossipSub.getPubSubPeer(peerId)
# Add message to `gossipSub`'s message cache
let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message())
peer.sentIHaves[^1].incl(id)
# Build an IHAVE message that contains the same message ID three times
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
# Given the peer has no budget to request messages
peer.iHaveBudget = 0
# When a peer makes an IHAVE request for the a message that `gossipSub` has
let iwants = gossipSub.handleIHave(peer, @[msg])
# Then `gossipSub` should not generate an IWant message for the message,
check:
iwants.messageIDs.len == 0
gossipSub.mcache.msgs.len == 1
asyncTest "handleIHave - peers with budget should request messages":
let topic = "foobar"
var (gossipSub, conns, peers) = setupGossipSubWithPeers(1, topic)
defer:
await teardownGossipSub(gossipSub, conns)
gossipSub.subscribe(topic, voidTopicHandler)
let peerId = randomPeerId()
let peer = gossipSub.getPubSubPeer(peerId)
# Add message to `gossipSub`'s message cache
let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message())
peer.sentIHaves[^1].incl(id)
# Build an IHAVE message that contains the same message ID three times
# If ids are repeated, only one request should be generated
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
# Given the budget is not 0 (because it's not been overridden)
check:
peer.iHaveBudget > 0
# When a peer makes an IHAVE request for the a message that `gossipSub` does not have
let iwants = gossipSub.handleIHave(peer, @[msg])
# Then `gossipSub` should generate an IWant message for the message
check:
iwants.messageIDs.len == 1
gossipSub.mcache.msgs.len == 1
asyncTest "handleIWant - peers with budget should request messages":
let topic = "foobar"
var (gossipSub, conns, peers) = setupGossipSubWithPeers(1, topic)
defer:
await teardownGossipSub(gossipSub, conns)
gossipSub.subscribe(topic, voidTopicHandler)
let peerId = randomPeerId()
let peer = gossipSub.getPubSubPeer(peerId)
# Add message to `gossipSub`'s message cache
let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message())
peer.sentIHaves[^1].incl(id)
# Build an IWANT message that contains the same message ID three times
# If ids are repeated, only one request should be generated
let msg = ControlIWant(messageIDs: @[id, id, id])
# When a peer makes an IWANT request for the a message that `gossipSub` has
let messages = gossipSub.handleIWant(peer, @[msg])
# Then `gossipSub` should return the message
check:
messages.len == 1
gossipSub.mcache.msgs.len == 1
asyncTest "Max IDONTWANT messages per heartbeat per peer":
# Given GossipSub node with 1 peer
let
topic = "foobar"
totalPeers = 1
let (gossipSub, conns, peers) = setupGossipSubWithPeers(totalPeers, topic)
defer:
await teardownGossipSub(gossipSub, conns)
let peer = peers[0]
# And sequence of iDontWants with more messages than max number (1200)
proc generateMessageIds(count: int): seq[MessageId] =
return (0 ..< count).mapIt(("msg_id_" & $it & $Moment.now()).toBytes())
let iDontWants =
@[
ControlIWant(messageIDs: generateMessageIds(600)),
ControlIWant(messageIDs: generateMessageIds(600)),
]
# When node handles iDontWants
gossipSub.handleIDontWant(peer, iDontWants)
# Then it saves max IDontWantMaxCount messages in the history and the rest is dropped
check:
peer.iDontWants[0].len == IDontWantMaxCount
asyncTest "`replenishFanout` Degree Lo":
let topic = "foobar"
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
defer:
await teardownGossipSub(gossipSub, conns)
check gossipSub.gossipsub[topic].len == 15
gossipSub.replenishFanout(topic)
check gossipSub.fanout[topic].len == gossipSub.parameters.d
asyncTest "`dropFanoutPeers` drop expired fanout topics":
let topic = "foobar"
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(6, topic, populateGossipsub = true, populateFanout = true)
defer:
await teardownGossipSub(gossipSub, conns)
gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(1.millis)
await sleepAsync(5.millis) # allow the topic to expire
check gossipSub.fanout[topic].len == gossipSub.parameters.d
gossipSub.dropFanoutPeers()
check topic notin gossipSub.fanout
asyncTest "`dropFanoutPeers` leave unexpired fanout topics":
let
topic1 = "foobar1"
topic2 = "foobar2"
let (gossipSub, conns, peers) = setupGossipSubWithPeers(
6, @[topic1, topic2], populateGossipsub = true, populateFanout = true
)
defer:
await teardownGossipSub(gossipSub, conns)
gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(1.millis)
gossipSub.lastFanoutPubSub[topic2] = Moment.fromNow(1.minutes)
await sleepAsync(5.millis) # allow first topic to expire
check gossipSub.fanout[topic1].len == gossipSub.parameters.d
check gossipSub.fanout[topic2].len == gossipSub.parameters.d
gossipSub.dropFanoutPeers()
check topic1 notin gossipSub.fanout
check topic2 in gossipSub.fanout
asyncTest "`getGossipPeers` - should gather up to degree D non intersecting peers":
let topic = "foobar"
let (gossipSub, conns, peers) = setupGossipSubWithPeers(45, topic)
defer:
await teardownGossipSub(gossipSub, conns)
# generate mesh and fanout peers
for i in 0 ..< 30:
let peer = peers[i]
if i mod 2 == 0:
gossipSub.fanout[topic].incl(peer)
else:
gossipSub.grafted(peer, topic)
gossipSub.mesh[topic].incl(peer)
# generate gossipsub (free standing) peers
for i in 30 ..< 45:
let peer = peers[i]
gossipSub.gossipsub[topic].incl(peer)
# generate messages
var seqno = 0'u64
for i in 0 .. 5:
let conn = conns[i]
inc seqno
let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
check gossipSub.fanout[topic].len == 15
check gossipSub.mesh[topic].len == 15
check gossipSub.gossipsub[topic].len == 15
let gossipPeers = gossipSub.getGossipPeers()
check gossipPeers.len == gossipSub.parameters.d
for p in gossipPeers.keys:
check not gossipSub.fanout.hasPeerId(topic, p.peerId)
check not gossipSub.mesh.hasPeerId(topic, p.peerId)
asyncTest "`getGossipPeers` - should not crash on missing topics in mesh":
let topic = "foobar"
let (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic)
defer:
await teardownGossipSub(gossipSub, conns)
# generate mesh and fanout peers
for i, peer in peers:
if i mod 2 == 0:
gossipSub.fanout[topic].incl(peer)
else:
gossipSub.gossipsub[topic].incl(peer)
# generate messages
var seqno = 0'u64
for i in 0 .. 5:
let conn = conns[i]
inc seqno
let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
let gossipPeers = gossipSub.getGossipPeers()
check gossipPeers.len == gossipSub.parameters.d
asyncTest "`getGossipPeers` - should not crash on missing topics in fanout":
let topic = "foobar"
let (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic)
defer:
await teardownGossipSub(gossipSub, conns)
# generate mesh and fanout peers
for i, peer in peers:
if i mod 2 == 0:
gossipSub.mesh[topic].incl(peer)
gossipSub.grafted(peer, topic)
else:
gossipSub.gossipsub[topic].incl(peer)
# generate messages
var seqno = 0'u64
for i in 0 .. 5:
let conn = conns[i]
inc seqno
let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
let gossipPeers = gossipSub.getGossipPeers()
check gossipPeers.len == gossipSub.parameters.d
asyncTest "`getGossipPeers` - should not crash on missing topics in gossip":
let topic = "foobar"
let (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic)
defer:
await teardownGossipSub(gossipSub, conns)
# generate mesh and fanout peers
for i, peer in peers:
if i mod 2 == 0:
gossipSub.mesh[topic].incl(peer)
gossipSub.grafted(peer, topic)
else:
gossipSub.fanout[topic].incl(peer)
# generate messages
var seqno = 0'u64
for i in 0 .. 5:
let conn = conns[i]
inc seqno
let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
let gossipPeers = gossipSub.getGossipPeers()
check gossipPeers.len == 0
asyncTest "`rebalanceMesh` Degree Lo":
let topic = "foobar"
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
defer:
await teardownGossipSub(gossipSub, conns)
check gossipSub.peers.len == 15
gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len == gossipSub.parameters.d
asyncTest "rebalanceMesh - bad peers":
let topic = "foobar"
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
defer:
await teardownGossipSub(gossipSub, conns)
var scoreLow = -11'f64
for peer in peers:
peer.score = scoreLow
scoreLow += 1.0
check gossipSub.peers.len == 15
gossipSub.rebalanceMesh(topic)
# low score peers should not be in mesh, that's why the count must be 4
check gossipSub.mesh[topic].len == 4
for peer in gossipSub.mesh[topic]:
check peer.score >= 0.0
asyncTest "`rebalanceMesh` Degree Hi":
let topic = "foobar"
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true, populateMesh = true)
defer:
await teardownGossipSub(gossipSub, conns)
check gossipSub.mesh[topic].len == 15
gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len ==
gossipSub.parameters.d + gossipSub.parameters.dScore
asyncTest "rebalanceMesh fail due to backoff":
let topic = "foobar"
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
defer:
await teardownGossipSub(gossipSub, conns)
for peer in peers:
gossipSub.backingOff.mgetOrPut(topic, initTable[PeerId, Moment]()).add(
peer.peerId, Moment.now() + 1.hours
)
let prunes = gossipSub.handleGraft(peer, @[ControlGraft(topicID: topic)])
# there must be a control prune due to violation of backoff
check prunes.len != 0
check gossipSub.peers.len == 15
gossipSub.rebalanceMesh(topic)
# expect 0 since they are all backing off
check gossipSub.mesh[topic].len == 0
asyncTest "rebalanceMesh fail due to backoff - remote":
let topic = "foobar"
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true, populateMesh = true)
defer:
await teardownGossipSub(gossipSub, conns)
check gossipSub.peers.len == 15
gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len != 0
for peer in peers:
gossipSub.handlePrune(
peer,
@[
ControlPrune(
topicID: topic,
peers: @[],
backoff: gossipSub.parameters.pruneBackoff.seconds.uint64,
)
],
)
# expect topic cleaned up since they are all pruned
check topic notin gossipSub.mesh
asyncTest "rebalanceMesh Degree Hi - audit scenario":
let
topic = "foobar"
numInPeers = 6
numOutPeers = 7
totalPeers = numInPeers + numOutPeers
let (gossipSub, conns, peers) = setupGossipSubWithPeers(
totalPeers, topic, populateGossipsub = true, populateMesh = true
)
defer:
await teardownGossipSub(gossipSub, conns)
gossipSub.parameters.dScore = 4
gossipSub.parameters.d = 6
gossipSub.parameters.dOut = 3
gossipSub.parameters.dHigh = 12
gossipSub.parameters.dLow = 4
for i in 0 ..< numInPeers:
let conn = conns[i]
let peer = peers[i]
conn.transportDir = Direction.In
peer.score = 40.0
for i in numInPeers ..< totalPeers:
let conn = conns[i]
let peer = peers[i]
conn.transportDir = Direction.Out
peer.score = 10.0
check gossipSub.mesh[topic].len == 13
gossipSub.rebalanceMesh(topic)
# ensure we are above dlow
check gossipSub.mesh[topic].len > gossipSub.parameters.dLow
var outbound = 0
for peer in gossipSub.mesh[topic]:
if peer.sendConn.transportDir == Direction.Out:
inc outbound
# ensure we give priority and keep at least dOut outbound peers
check outbound >= gossipSub.parameters.dOut
asyncTest "rebalanceMesh Degree Hi - dScore controls number of peers to retain by score when pruning":
# Given GossipSub node starting with 13 peers in mesh
let
topic = "foobar"
totalPeers = 13
let (gossipSub, conns, peers) = setupGossipSubWithPeers(
totalPeers, topic, populateGossipsub = true, populateMesh = true
)
defer:
await teardownGossipSub(gossipSub, conns)
# And mesh is larger than dHigh
gossipSub.parameters.dLow = 4
gossipSub.parameters.d = 6
gossipSub.parameters.dHigh = 8
gossipSub.parameters.dOut = 3
gossipSub.parameters.dScore = 13
check gossipSub.mesh[topic].len == totalPeers
# When mesh is rebalanced
gossipSub.rebalanceMesh(topic)
# Then prunning is not triggered when mesh is not larger than dScore
check gossipSub.mesh[topic].len == totalPeers
asyncTest "GossipThreshold - do not handle IHave if peer score is below threshold":
const
topic = "foobar"
gossipThreshold = -100.0
let
(gossipSub, conns, peers) = setupGossipSubWithPeers(1, topic)
peer = peers[0]
defer:
await teardownGossipSub(gossipSub, conns)
# Given peer with score below GossipThreshold
gossipSub.parameters.gossipThreshold = gossipThreshold
peer.score = gossipThreshold - 100.0
# and IHave message
let id = @[0'u8, 1, 2, 3]
let msg = ControlIHave(topicID: topic, messageIDs: @[id])
# When IHave is handled
let iWant = gossipSub.handleIHave(peer, @[msg])
# Then IHave is ignored
check:
iWant.messageIDs.len == 0
asyncTest "GossipThreshold - do not handle IWant if peer score is below threshold":
const
topic = "foobar"
gossipThreshold = -100.0
let
(gossipSub, conns, peers) = setupGossipSubWithPeers(1, topic)
peer = peers[0]
defer:
await teardownGossipSub(gossipSub, conns)
# Given peer with score below GossipThreshold
gossipSub.parameters.gossipThreshold = gossipThreshold
peer.score = gossipThreshold - 100.0
# and IWant message with MsgId in mcache and sentIHaves
let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message())
peer.sentIHaves[0].incl(id)
let msg = ControlIWant(messageIDs: @[id])
# When IWant is handled
let messages = gossipSub.handleIWant(peer, @[msg])
# Then IWant is ignored
check:
messages.len == 0
asyncTest "GossipThreshold - do not trigger PeerExchange on Prune":
const
topic = "foobar"
gossipThreshold = -100.0
let
(gossipSub, conns, peers) = setupGossipSubWithPeers(1, topic)
peer = peers[0]
defer:
await teardownGossipSub(gossipSub, conns)
# Given peer with score below GossipThreshold
gossipSub.parameters.gossipThreshold = gossipThreshold
peer.score = gossipThreshold - 100.0
# and RoutingRecordsHandler added
var routingRecordsFut = newFuture[void]()
gossipSub.routingRecordsHandler.add(
proc(peer: PeerId, tag: string, peers: seq[RoutingRecordsPair]) =
routingRecordsFut.complete()
)
# and Prune message
let msg = ControlPrune(
topicID: topic, peers: @[PeerInfoMsg(peerId: peer.peerId)], backoff: 123'u64
)
# When Prune is handled
gossipSub.handlePrune(peer, @[msg])
# Then handler is not triggered
let result = await waitForState(routingRecordsFut, HEARTBEAT_TIMEOUT)
check:
result.isCancelled()
asyncTest "GossipThreshold - do not select peer for IHave broadcast if peer score is below threshold":
const
topic = "foobar"
gossipThreshold = -100.0
let
(gossipSub, conns, peers) =
setupGossipSubWithPeers(1, topic, populateGossipsub = true)
peer = peers[0]
defer:
await teardownGossipSub(gossipSub, conns)
# Given peer with score below GossipThreshold
gossipSub.parameters.gossipThreshold = gossipThreshold
peer.score = gossipThreshold - 100.0
# and message in cache
let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message(topic: topic))
# When Node selects peers for IHave broadcast
let gossipPeers = gossipSub.getGossipPeers()
# Then peer is not selected
check:
gossipPeers.len == 0
asyncTest "PublishThreshold - do not graft when peer score below threshold":
const
topic = "foobar"
publishThreshold = -100.0
let
(gossipSub, conns, peers) = setupGossipSubWithPeers(1, topic)
peer = peers[0]
defer:
await teardownGossipSub(gossipSub, conns)
# Given peer with score below publishThreshold
gossipSub.parameters.publishThreshold = publishThreshold
peer.score = publishThreshold - 100.0
# and Graft message
let msg = ControlGraft(topicID: topic)
# When Graft is handled
let prunes = gossipSub.handleGraft(peer, @[msg])
# Then peer is ignored and not added to prunes
check:
gossipSub.mesh[topic].len == 0
prunes.len == 0

View File

@@ -0,0 +1,95 @@
# Nim-LibP2P
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.used.}
import chronicles
import stew/byteutils
import utils
import ../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable]
import ../../libp2p/protocols/pubsub/rpc/[message, protobuf]
import ../helpers
suite "GossipSub":
teardown:
checkTrackers()
asyncTest "subscribe/unsubscribeAll":
let topic = "foobar"
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true, populateMesh = true)
defer:
await teardownGossipSub(gossipSub, conns)
# test via dynamic dispatch
gossipSub.PubSub.subscribe(topic, voidTopicHandler)
check:
gossipSub.topics.contains(topic)
gossipSub.gossipsub[topic].len() > 0
gossipSub.mesh[topic].len() > 0
# test via dynamic dispatch
gossipSub.PubSub.unsubscribeAll(topic)
check:
topic notin gossipSub.topics # not in local topics
topic notin gossipSub.mesh # not in mesh
topic in gossipSub.gossipsub # but still in gossipsub table (for fanning out)
asyncTest "Drop messages of topics without subscription":
let topic = "foobar"
var (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic)
defer:
await teardownGossipSub(gossipSub, conns)
# generate messages
var seqno = 0'u64
for i in 0 .. 5:
let conn = conns[i]
let peer = peers[i]
inc seqno
let msg = Message.init(conn.peerId, ("bar" & $i).toBytes(), topic, some(seqno))
await gossipSub.rpcHandler(peer, encodeRpcMsg(RPCMsg(messages: @[msg]), false))
check gossipSub.mcache.msgs.len == 0
asyncTest "subscription limits":
let gossipSub = TestGossipSub.init(newStandardSwitch())
gossipSub.topicsHigh = 10
var tooManyTopics: seq[string]
for i in 0 .. gossipSub.topicsHigh + 10:
tooManyTopics &= "topic" & $i
let lotOfSubs = RPCMsg.withSubs(tooManyTopics, true)
let conn = TestBufferStream.new(noop)
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
await gossipSub.rpcHandler(peer, encodeRpcMsg(lotOfSubs, false))
check:
gossipSub.gossipsub.len == gossipSub.topicsHigh
peer.behaviourPenalty > 0.0
await conn.close()
await gossipSub.switch.stop()
asyncTest "invalid message bytes":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let peerId = randomPeerId()
let peer = gossipSub.getPubSubPeer(peerId)
expect(CatchableError):
await gossipSub.rpcHandler(peer, @[byte 1, 2, 3])
await gossipSub.switch.stop()

View File

@@ -1,7 +1,7 @@
{.used.}
import
testgossipsubcontrolmessages, testgossipsubfanout, testgossipsubcustomconn,
testgossipsubgossip, testgossipsubheartbeat, testgossipsubmeshmanagement,
testgossipsubmessagecache, testgossipsubmessagehandling, testgossipsubparams,
testgossipsubscoring, testfloodsub, testmcache, testtimedcache, testmessage
testbehavior, testgossipsub, testgossipsubparams, testmcache, testmessage,
testscoring, testtimedcache
import ./integration/testpubsubintegration

View File

@@ -0,0 +1,44 @@
# Nim-LibP2P
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.used.}
import utils
import ../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable, pubsubpeer]
import ../../libp2p/muxers/muxer
import ../helpers
suite "GossipSub Scoring":
teardown:
checkTrackers()
asyncTest "Disconnect bad peers":
let topic = "foobar"
var (gossipSub, conns, peers) =
setupGossipSubWithPeers(30, topic, populateGossipsub = true)
defer:
await teardownGossipSub(gossipSub, conns)
gossipSub.parameters.disconnectBadPeers = true
gossipSub.parameters.appSpecificWeight = 1.0
for i, peer in peers:
peer.appScore = gossipSub.parameters.graylistThreshold - 1
let conn = conns[i]
gossipSub.switch.connManager.storeMuxer(Muxer(connection: conn))
gossipSub.updateScores()
await sleepAsync(100.millis)
check:
# test our disconnect mechanics
gossipSub.gossipsub.peers(topic) == 0
# also ensure we cleanup properly the peersInIP table
gossipSub.peersInIP.len == 0