Compare commits

...

16 Commits

Author SHA1 Message Date
shashankshampi
e41c08fcb2 mesh check and code rebase 2024-10-06 01:16:09 +05:30
shashankshampi
f42a763708 added assertion for handle SUBSCRIBE to the topic 2024-10-03 16:39:04 +05:30
shashankshampi
19d3ead6b8 removed unwanted check 2024-10-03 16:04:22 +05:30
shashankshampi
46b712531d removed internal subscribe and unsubscribe test 2024-10-03 15:03:09 +05:30
shashankshampi
f0c8c5b6e6 review comments to remove unwanted comments 2024-10-03 15:00:17 +05:30
shashankshampi
25df50d691 updtaed as per review comment 2024-10-03 13:07:24 +05:30
shashankshampi
27c2850bc7 Merge branch 'master' into block6Test2 2024-10-02 16:33:48 +05:30
shashankshampi
9c0966e21e Merge branch 'master' into block6Test 2024-10-02 16:25:18 +05:30
shashankshampi
eb2f6bf346 test(gossipsub): Test cases covering subscribe and unsubscribe Events
added test wrt subscribe and unsubscribe

added tests/pubsub/testgossipinternal2 file

linters

feat: rendezvous refactor (#1183)

Hello!

This PR aim to refactor rendezvous code so that it is easier to impl.
Waku rdv strategy. The hardcoded min and max TTL were out of range with
what we needed and specifying which peers to interact with is also
needed since Waku deals with peers on multiple separate shards.

I tried to keep the changes to a minimum, specifically I did not change
the name of any public procs which result in less than descriptive names
in some cases. I also wanted to return results instead of raising
exceptions but didn't. Would it be acceptable to do so?

Please advise on best practices, thank you.

---------

Co-authored-by: Ludovic Chenut <ludovic@status.im>

refactor and suite name refactor

chore(ci): Enable S3 caching for interop (#1193)

- Adds our S3 bucket for caching docker images as Protocol Labs shut
down their shared one.
- Remove the free disk space workaround that prevented the jobs from
failing for using too much space for the images.

---------

Co-authored-by: diegomrsantos <diego@status.im>

PR review comment changes
2024-10-01 17:28:39 +05:30
shashankshampi
fda0d2b6e3 test(gossipsub): import optimization 2024-10-01 17:22:47 +05:30
shashankshampi
2923a2d280 test(gossipsub): added test for membership for join and leave topic 2024-10-01 17:18:08 +05:30
shashankshampi
1c2e221d75 refactor and suite name refactor 2024-09-27 10:16:49 +05:30
shashankshampi
eced002339 Merge branch 'master' into block6Test 2024-09-26 13:33:57 +05:30
shashankshampi
5790b6f428 linters 2024-09-26 13:26:17 +05:30
shashankshampi
dc7f8d4317 added tests/pubsub/testgossipinternal2 file 2024-09-26 11:33:25 +05:30
shashankshampi
871efab571 added test wrt subscribe and unsubscribe 2024-09-26 11:28:07 +05:30
3 changed files with 278 additions and 40 deletions

1
.gitignore vendored
View File

@@ -11,6 +11,7 @@ build/
*.exe
*.dll
.vscode/
.idea/
.DS_Store
tests/pubsub/testgossipsub
examples/*.md

View File

@@ -33,46 +33,6 @@ suite "GossipSub internal":
teardown:
checkTrackers()
asyncTest "subscribe/unsubscribeAll":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(topic: string, data: seq[byte]): Future[void] {.gcsafe.} =
discard
let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()
var conns = newSeq[Connection]()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
for i in 0 ..< 15:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.sendConn = conn
gossipSub.gossipsub[topic].incl(peer)
# test via dynamic dispatch
gossipSub.PubSub.subscribe(topic, handler)
check:
gossipSub.topics.contains(topic)
gossipSub.gossipsub[topic].len() > 0
gossipSub.mesh[topic].len() > 0
# test via dynamic dispatch
gossipSub.PubSub.unsubscribeAll(topic)
check:
topic notin gossipSub.topics # not in local topics
topic notin gossipSub.mesh # not in mesh
topic in gossipSub.gossipsub # but still in gossipsub table (for fanning out)
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "topic params":
let params = TopicParams.init()
params.validateParameters().tryGet()

View File

@@ -0,0 +1,277 @@
import std/[options, deques, sequtils, enumerate, algorithm, sets]
import stew/byteutils
import ../../libp2p/builders
import ../../libp2p/errors
import ../../libp2p/crypto/crypto
import ../../libp2p/stream/bufferstream
import ../../libp2p/protocols/pubsub/[pubsub, gossipsub, mcache, mcache, peertable]
import ../../libp2p/protocols/pubsub/rpc/[message, messages]
import ../../libp2p/switch
import ../../libp2p/muxers/muxer
import ../../libp2p/protocols/pubsub/rpc/protobuf
import utils
import chronos
import ../helpers
proc noop(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
const MsgIdSuccess = "msg id gen success"
suite "GossipSub Topic Membership Tests":
teardown:
checkTrackers()
# Addition of Designed Test cases for 6. Topic Membership Tests: https://www.notion.so/Gossipsub-651e02d4d7894bb2ac1e4edb55f3192d
# Generalized setup function to initialize one or more topics
proc setupGossipSub(
topics: seq[string], numPeers: int
): (TestGossipSub, seq[Connection]) =
let gossipSub = TestGossipSub.init(newStandardSwitch())
var conns = newSeq[Connection]()
for topic in topics:
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
for i in 0 ..< numPeers:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.sendConn = conn
gossipSub.gossipsub[topic].incl(peer)
return (gossipSub, conns)
# Wrapper function to initialize a single topic by converting it into a seq
proc setupGossipSub(topic: string, numPeers: int): (TestGossipSub, seq[Connection]) =
setupGossipSub(@[topic], numPeers)
# Helper function to subscribe to topics
proc subscribeToTopics(gossipSub: TestGossipSub, topics: seq[string]) =
for topic in topics:
gossipSub.PubSub.subscribe(
topic,
proc(topic: string, data: seq[byte]): Future[void] {.async.} =
discard
,
)
# Helper function to unsubscribe to topics
proc unsubscribeFromTopics(gossipSub: TestGossipSub, topics: seq[string]) =
for topic in topics:
gossipSub.PubSub.unsubscribeAll(topic)
# Simulate the `SUBSCRIBE` to the topic and check proper handling in the mesh and gossipsub structures
asyncTest "handle SUBSCRIBE to the topic":
let topic = "test-topic"
let (gossipSub, conns) = setupGossipSub(topic, 5)
# Subscribe to the topic
subscribeToTopics(gossipSub, @[topic])
# Check if the topic is present in the list of subscribed topics
check gossipSub.topics.contains(topic)
# Check if the topic is added to gossipsub and the peers list is not empty
check gossipSub.gossipsub[topic].len() > 0
# Close all peer connections and verify that they are properly cleaned up
await allFuturesThrowing(conns.mapIt(it.close()))
# Stop the gossipSub switch and wait for it to stop completely
await gossipSub.switch.stop()
# Simulate an UNSUBSCRIBE to the topic and check if the topic is removed from the relevant data structures but remains in gossipsub
asyncTest "handle UNSUBSCRIBE to the topic":
let topic = "test-topic"
let (gossipSub, conns) = setupGossipSub(topic, 5)
# Subscribe to the topic first
subscribeToTopics(gossipSub, @[topic])
# Now unsubscribe from the topic
unsubscribeFromTopics(gossipSub, @[topic])
# Verify the topic is removed from relevant structures
check topic notin gossipSub.topics
check topic notin gossipSub.mesh
check topic in gossipSub.gossipsub
# The topic should remain in gossipsub (for fanout)
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
# Test subscribing and unsubscribing multiple topics
asyncTest "handle SUBSCRIBE and UNSUBSCRIBE multiple topics":
let topics = ["topic1", "topic2", "topic3"].toSeq()
let (gossipSub, conns) = setupGossipSub(topics, 5)
# Subscribe to multiple topics
subscribeToTopics(gossipSub, topics)
# Verify that all topics are added to the topics and gossipsub
check gossipSub.topics.len == 3
for topic in topics:
check gossipSub.gossipsub[topic].len() >= 0
# Unsubscribe from all topics
unsubscribeFromTopics(gossipSub, topics)
# Ensure topics are removed from topics and mesh, but still present in gossipsub
for topic in topics:
check topic notin gossipSub.topics
check topic notin gossipSub.mesh
check topic in gossipSub.gossipsub
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
# Test ensuring that the number of subscriptions does not exceed the limit set in the GossipSub parameters
asyncTest "subscription limit test":
let gossipSub = TestGossipSub.init(newStandardSwitch())
gossipSub.topicsHigh = 10
var conns = newSeq[Connection]()
for i in 0 .. gossipSub.topicsHigh + 5:
let topic = "topic" & $i
# Ensure all topics are properly initialized before subscribing
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
if gossipSub.topics.len < gossipSub.topicsHigh:
gossipSub.PubSub.subscribe(
topic,
proc(topic: string, data: seq[byte]): Future[void] {.async.} =
discard
,
)
else:
# Prevent subscription beyond the limit and log the error
echo "Subscription limit reached for topic: ", topic
# Ensure that the number of subscribed topics does not exceed the limit
check gossipSub.topics.len <= gossipSub.topicsHigh
check gossipSub.topics.len == gossipSub.topicsHigh
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
# Test for verifying peers joining a topic using `JOIN(topic)`
asyncTest "handle JOIN event":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "test-join-topic"
# Initialize relevant data structures
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
var conns = newSeq[Connection]()
for i in 0 ..< 5:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.sendConn = conn
gossipSub.gossipsub[topic].incl(peer)
# Simulate the peer joining the topic
gossipSub.PubSub.subscribe(
topic,
proc(topic: string, data: seq[byte]): Future[void] {.async.} =
discard
,
)
check gossipSub.mesh[topic].len > 0 # Ensure the peer is added to the mesh
check gossipSub.topics.contains(topic) # Ensure the topic is in `topics`
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
# Test for verifying peers leaving a topic using `LEAVE(topic)`
asyncTest "handle LEAVE event":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "test-leave-topic"
# Initialize relevant data structures
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
var conns = newSeq[Connection]()
for i in 0 ..< 5:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.sendConn = conn
gossipSub.gossipsub[topic].incl(peer)
# Simulate peer joining the topic first
gossipSub.PubSub.subscribe(
topic,
proc(topic: string, data: seq[byte]): Future[void] {.async.} =
discard
,
)
# Now simulate peer leaving the topic
gossipSub.PubSub.unsubscribeAll(topic)
check topic notin gossipSub.mesh # Ensure the peer is removed from the mesh
check topic in gossipSub.gossipsub # Ensure the topic remains in `gossipsub`
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "handle JOIN topic with update mesh":
let topic = "test-topic-join"
# Initialize the GossipSub system and simulate peer connections
let (gossipSub, conns) = setupGossipSub(topic, 5)
# Subscribe to the topic
subscribeToTopics(gossipSub, @[topic])
# Check that peers are added to the mesh after subscribing
check gossipSub.mesh[topic].len() > 0
check gossipSub.topics.contains(topic)
# Clean up by closing connections and stopping the gossipSub switch
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "handle LEAVE topic with update mesh":
let topic = "test-topic-leave"
# Initialize the GossipSub system and simulate peer connections
let (gossipSub, conns) = setupGossipSub(topic, 5)
# Subscribe to the topic
subscribeToTopics(gossipSub, @[topic])
# Now unsubscribe from the topic
unsubscribeFromTopics(gossipSub, @[topic])
# Check that peers are removed from the mesh but still present in gossipsub
check topic notin gossipSub.mesh
check topic in gossipSub.gossipsub
# Clean up by closing connections and stopping the gossipSub switch
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()