Compare commits

...

11 Commits

Author SHA1 Message Date
ufarooqstatus
3fb83b8672 Remove peers from relay (ToSendPeers) list, from which message or idontwant is received. We delay this action till sendMsg to increase such peers 2024-02-19 14:21:39 +05:00
diegomrsantos
fe4ff79885 feat: message prioritization with immediate peer-published dispatch and queuing for other msgs (#1015) 2024-02-16 10:54:16 +01:00
Álex Cabeza Romero
aa4ebb0b3c docs(general): Improve docs (#1021) 2024-02-15 16:14:26 +01:00
diegomrsantos
e0f70b7177 improvement: enhanced checkExpiring macro with custom timeout (#1023) 2024-02-09 11:51:27 +01:00
Ludovic Chenut
c1dfd58772 fix: yamux metrics (#1022) 2024-02-08 12:36:58 +01:00
Álex Cabeza Romero
04af0c4323 test(flaky): Log checkExpiring failure (#1018)
Add simple logging mechanism on checkExpiring failure.
2024-02-06 17:47:13 +01:00
Ludovic Chenut
eb0890cd6f docs: add comments and improve yamux readability (#1006) 2024-02-02 15:14:02 +01:00
Álex Cabeza Romero
9bc5ec1566 tests(flaky): Increase check timeouts (#995)
Increase checkExpiring timeouts to verify impact on flaky tests.
2024-01-31 23:46:12 +00:00
diegomrsantos
5594bcb33e fix: more metrics issues when libp2p_expensive_metrics is enabled (#1016) 2024-01-30 16:55:55 +01:00
diegomrsantos
d46bcdb6ac fix: compilation issue when libp2p_expensive_metrics is enabled. (#1014) 2024-01-29 11:31:11 +01:00
diegomrsantos
9468bb6b4d fix(hole-punching-interop): update nim to 1.6.16 (#1012) 2024-01-26 11:15:40 +01:00
25 changed files with 463 additions and 178 deletions

1
.gitignore vendored
View File

@@ -16,3 +16,4 @@ tests/pubsub/testgossipsub
examples/*.md
nimble.develop
nimble.paths
go-libp2p-daemon/

View File

@@ -20,6 +20,7 @@
- [Background](#background)
- [Install](#install)
- [Getting Started](#getting-started)
- [Go-libp2p-daemon](#go-libp2p-daemon)
- [Modules](#modules)
- [Users](#users)
- [Stability](#stability)
@@ -40,6 +41,8 @@ Learn more about libp2p at [**libp2p.io**](https://libp2p.io) and follow libp2p'
## Install
**Prerequisite**
- [Nim](https://nim-lang.org/install.html)
> The currently supported Nim version is 1.6.18.
```
nimble install libp2p
```
@@ -47,11 +50,11 @@ nimble install libp2p
## Getting Started
You'll find the nim-libp2p documentation [here](https://status-im.github.io/nim-libp2p/docs/).
**Go Daemon:**
Please find the installation and usage intructions in [daemonapi.md](examples/go-daemon/daemonapi.md).
### Testing
Remember you'll need to build the `go-libp2p-daemon` binary to run the `nim-libp2p` tests.
To do so, please follow the installation instructions in [daemonapi.md](examples/go-daemon/daemonapi.md).
## Modules
List of packages modules implemented in nim-libp2p:
| Name | Description |

View File

@@ -1,6 +1,8 @@
# Table of Contents
- [Introduction](#introduction)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Script](#script)
- [Usage](#usage)
- [Example](#example)
- [Getting Started](#getting-started)
@@ -8,26 +10,29 @@
# Introduction
This is a libp2p-backed daemon wrapping the functionalities of go-libp2p for use in Nim. <br>
For more information about the go daemon, check out [this repository](https://github.com/libp2p/go-libp2p-daemon).
> **Required only** for running the tests.
# Prerequisites
Go with version `1.15.15`.
> You will *likely* be able to build `go-libp2p-daemon` with different Go versions, but **they haven't been tested**.
# Installation
Follow one of the methods below:
## Script
Run the build script while having the `go` command pointing to the correct Go version.
We recommend using `1.15.15`, as previously stated.
```sh
# clone and install dependencies
git clone https://github.com/status-im/nim-libp2p
cd nim-libp2p
nimble install
# perform unit tests
nimble test
# update the git submodule to install the go daemon
git submodule update --init --recursive
go version
git clone https://github.com/libp2p/go-libp2p-daemon
cd go-libp2p-daemon
git checkout v0.0.1
go install ./...
cd ..
./scripts/build_p2pd.sh
```
If everything goes correctly, the binary (`p2pd`) should be built and placed in the correct directory.
If you find any issues, please head into our discord and ask for our asistance.
After successfully building the binary, remember to add it to your path so it can be found. You can do that by running:
```sh
export PATH="$PATH:$HOME/go/bin"
```
> **Tip:** To make this change permanent, add the command above to your `.bashrc` file.
# Usage

View File

@@ -29,9 +29,9 @@ const
when defined(libp2p_yamux_metrics):
declareGauge(libp2p_yamux_channels, "yamux channels", labels = ["initiator", "peer"])
declareHistogram libp2p_yamux_send_queue, "message send queue length (in byte)",
buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 1600.0, 6400.0, 25600.0, 256000.0]
buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 3200.0, 6400.0, 25600.0, 256000.0]
declareHistogram libp2p_yamux_recv_queue, "message recv queue length (in byte)",
buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 1600.0, 6400.0, 25600.0, 256000.0]
buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 3200.0, 6400.0, 25600.0, 256000.0]
type
YamuxError* = object of CatchableError
@@ -172,12 +172,16 @@ proc `$`(channel: YamuxChannel): string =
result &= " {" & s.foldl(if a != "": a & ", " & b else: b, "") & "}"
proc lengthSendQueue(channel: YamuxChannel): int =
## Returns the length of what remains to be sent
##
channel.sendQueue.foldl(a + b.data.len - b.sent, 0)
proc lengthSendQueueWithLimit(channel: YamuxChannel): int =
## Returns the length of what remains to be sent, but limit the size of big messages.
##
# For leniency, limit big messages size to the third of maxSendQueueSize
# This value is arbitrary, it's not in the specs
# It permits to store up to 3 big messages if the peer is stalling.
# This value is arbitrary, it's not in the specs, it permits to store up to
# 3 big messages if the peer is stalling.
channel.sendQueue.foldl(a + min(b.data.len - b.sent, channel.maxSendQueueSize div 3), 0)
proc actuallyClose(channel: YamuxChannel) {.async.} =
@@ -200,6 +204,9 @@ method closeImpl*(channel: YamuxChannel) {.async.} =
await channel.actuallyClose()
proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} =
# If we reset locally, we want to flush up to a maximum of recvWindow
# bytes. It's because the peer we're connected to can send us data before
# it receives the reset.
if channel.isReset:
return
trace "Reset channel"
@@ -220,11 +227,14 @@ proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} =
await channel.remoteClosed()
channel.receivedData.fire()
if not isLocal:
# If we reset locally, we want to flush up to a maximum of recvWindow
# bytes. We use the recvWindow in the proc cleanupChann.
# If the reset is remote, there's no reason to flush anything.
channel.recvWindow = 0
proc updateRecvWindow(channel: YamuxChannel) {.async.} =
## Send to the peer a window update when the recvWindow is empty enough
##
# In order to avoid spamming a window update everytime a byte is read,
# we send it everytime half of the maxRecvWindow is read.
let inWindow = channel.recvWindow + channel.recvQueue.len
if inWindow > channel.maxRecvWindow div 2:
return
@@ -242,6 +252,7 @@ method readOnce*(
pbytes: pointer,
nbytes: int):
Future[int] {.async.} =
## Read from a yamux channel
if channel.isReset:
raise if channel.remoteReset:
@@ -287,17 +298,18 @@ proc trySend(channel: YamuxChannel) {.async.} =
return
channel.isSending = true
defer: channel.isSending = false
while channel.sendQueue.len != 0:
channel.sendQueue.keepItIf(not (it.fut.cancelled() and it.sent == 0))
if channel.sendWindow == 0:
trace "send window empty"
trace "trying to send while the sendWindow is empty"
if channel.lengthSendQueueWithLimit() > channel.maxSendQueueSize:
debug "channel send queue too big, resetting", maxSendQueueSize=channel.maxSendQueueSize,
trace "channel send queue too big, resetting", maxSendQueueSize=channel.maxSendQueueSize,
currentQueueSize = channel.lengthSendQueueWithLimit()
try:
await channel.reset(true)
except CatchableError as exc:
debug "failed to reset", msg=exc.msg
warn "failed to reset", msg=exc.msg
break
let
@@ -316,20 +328,24 @@ proc trySend(channel: YamuxChannel) {.async.} =
var futures: seq[Future[void]]
while inBuffer < toSend:
# concatenate the different message we try to send into one buffer
let (data, sent, fut) = channel.sendQueue[0]
let bufferToSend = min(data.len - sent, toSend - inBuffer)
sendBuffer.toOpenArray(12, 12 + toSend - 1)[inBuffer..<(inBuffer+bufferToSend)] =
channel.sendQueue[0].data.toOpenArray(sent, sent + bufferToSend - 1)
channel.sendQueue[0].sent.inc(bufferToSend)
if channel.sendQueue[0].sent >= data.len:
# if every byte of the message is in the buffer, add the write future to the
# sequence of futures to be completed (or failed) when the buffer is sent
futures.add(fut)
channel.sendQueue.delete(0)
inBuffer.inc(bufferToSend)
trace "build send buffer", h = $header, msg=string.fromBytes(sendBuffer[12..^1])
trace "try to send the buffer", h = $header
channel.sendWindow.dec(toSend)
try: await channel.conn.write(sendBuffer)
except CatchableError as exc:
trace "failed to send the buffer"
let connDown = newLPStreamConnDownError(exc)
for fut in futures.items():
fut.fail(connDown)
@@ -340,6 +356,8 @@ proc trySend(channel: YamuxChannel) {.async.} =
channel.activity = true
method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] =
## Write to yamux channel
##
result = newFuture[void]("Yamux Send")
if channel.remoteReset:
result.fail(newLPStreamResetError())
@@ -352,10 +370,12 @@ method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] =
return result
channel.sendQueue.add((msg, 0, result))
when defined(libp2p_yamux_metrics):
libp2p_yamux_recv_queue.observe(channel.lengthSendQueue().int64)
libp2p_yamux_send_queue.observe(channel.lengthSendQueue().int64)
asyncSpawn channel.trySend()
proc open*(channel: YamuxChannel) {.async.} =
proc open(channel: YamuxChannel) {.async.} =
## Open a yamux channel by sending a window update with Syn or Ack flag
##
if channel.opened:
trace "Try to open channel twice"
return
@@ -381,7 +401,7 @@ proc lenBySrc(m: Yamux, isSrc: bool): int =
for v in m.channels.values():
if v.isSrc == isSrc: result += 1
proc cleanupChann(m: Yamux, channel: YamuxChannel) {.async.} =
proc cleanupChannel(m: Yamux, channel: YamuxChannel) {.async.} =
await channel.join()
m.channels.del(channel.id)
when defined(libp2p_yamux_metrics):
@@ -419,7 +439,7 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool,
when defined(libp2p_agents_metrics):
result.shortAgent = m.connection.shortAgent
m.channels[id] = result
asyncSpawn m.cleanupChann(result)
asyncSpawn m.cleanupChannel(result)
trace "created channel", id, pid=m.connection.peerId
when defined(libp2p_yamux_metrics):
libp2p_yamux_channels.set(m.lenBySrc(isSrc).int64, [$isSrc, $result.peerId])
@@ -440,7 +460,7 @@ method close*(m: Yamux) {.async.} =
trace "Closed yamux"
proc handleStream(m: Yamux, channel: YamuxChannel) {.async.} =
## call the muxer stream handler for this channel
## Call the muxer stream handler for this channel
##
try:
await m.streamHandler(channel)
@@ -474,6 +494,7 @@ method handle*(m: Yamux) {.async.} =
else:
if header.streamId in m.flushed:
m.flushed.del(header.streamId)
if header.streamId mod 2 == m.currentId mod 2:
debug "Peer used our reserved stream id, skipping", id=header.streamId, currentId=m.currentId, peerId=m.connection.peerId
raise newException(YamuxError, "Peer used our reserved stream id")

View File

@@ -157,7 +157,7 @@ method rpcHandler*(f: FloodSub,
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
f.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
f.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
trace "Forwared message to peers", peers = toSendPeers.len
f.updateMetrics(rpcMsg)
@@ -219,7 +219,7 @@ method publish*(f: FloodSub,
return 0
# Try to send to all peers that are known to be interested
f.broadcast(peers, RPCMsg(messages: @[msg]))
f.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)
when defined(libp2p_expensive_metrics):
libp2p_pubsub_messages_published.inc(labelValues = [topic])

View File

@@ -46,6 +46,9 @@ declareCounter(libp2p_gossipsub_saved_bytes, "bytes saved by gossipsub optimizat
declareCounter(libp2p_gossipsub_duplicate, "number of duplicates received")
declareCounter(libp2p_gossipsub_received, "number of messages received (deduplicated)")
when defined(libp2p_expensive_metrics):
declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id", "topic"])
proc init*(_: type[GossipSubParams]): GossipSubParams =
GossipSubParams(
explicit: true,
@@ -217,6 +220,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
for topic, info in stats[].topicInfos.mpairs:
info.firstMessageDeliveries = 0
pubSubPeer.stopSendNonPriorityTask()
procCall FloodSub(g).unsubscribePeer(peer)
proc handleSubscribe*(g: GossipSub,
@@ -276,12 +281,28 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)
if
respControl.prune.len > 0 or
respControl.iwant.len > 0 or
messages.len > 0:
# iwant and prunes from here, also messages
let
isPruneNotEmpty = respControl.prune.len > 0
isIWantNotEmpty = respControl.iwant.len > 0
if isPruneNotEmpty or isIWantNotEmpty:
if isIWantNotEmpty:
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
if isPruneNotEmpty:
for prune in respControl.prune:
if g.knownTopics.contains(prune.topicId):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
trace "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
RPCMsg(control: some(respControl)), isHighPriority = true)
if messages.len > 0:
for smsg in messages:
for topic in smsg.topicIds:
if g.knownTopics.contains(topic):
@@ -289,18 +310,11 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
for prune in respControl.prune:
if g.knownTopics.contains(prune.topicId):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
trace "sending control message", msg = shortLog(respControl), peer
# iwant replies have lower priority
trace "sending iwant reply messages", peer
g.send(
peer,
RPCMsg(control: some(respControl), messages: messages))
RPCMsg(messages: messages), isHighPriority = false)
proc validateAndRelay(g: GossipSub,
msg: Message,
@@ -353,7 +367,7 @@ proc validateAndRelay(g: GossipSub,
if msg.data.len > msgId.len * 10:
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
idontwant: @[ControlIWant(messageIds: @[msgId])]
))))
))), isHighPriority = true)
for peer in toSendPeers:
for heDontWant in peer.heDontWants:
@@ -367,7 +381,7 @@ proc validateAndRelay(g: GossipSub,
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false, validMsgId = msgId)
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
for topic in msg.topicIds:
if topic notin g.topics: continue
@@ -426,6 +440,11 @@ method rpcHandler*(g: GossipSub,
await rateLimit(g, peer, Opt.none(RPCMsg), msgSize)
return
when defined(libp2p_expensive_metrics):
for m in rpcMsg.messages:
for t in m.topicIds:
libp2p_pubsub_received_messages.inc(labelValues = [$peer.peerId, t])
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
await rateLimit(g, peer, Opt.some(rpcMsg), msgSize)
@@ -433,7 +452,7 @@ method rpcHandler*(g: GossipSub,
peer.recvObservers(rpcMsg)
if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
g.send(peer, RPCMsg(pong: rpcMsg.ping))
g.send(peer, RPCMsg(pong: rpcMsg.ping), isHighPriority = true)
peer.pingBudget.dec
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i]
@@ -478,6 +497,9 @@ method rpcHandler*(g: GossipSub,
libp2p_gossipsub_duplicate.inc()
if msg.data.len > msgId.len * 10: #Dont relay to the peers from which we already received (We just do it for large messages)
peer.heDontWants[^1].incl(msgId)
# onto the next message
continue
@@ -543,7 +565,7 @@ method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.unsubscribeBackoff.seconds.uint64)])))
g.broadcast(mpeers, msg)
g.broadcast(mpeers, msg, isHighPriority = true)
for peer in mpeers:
g.pruned(peer, topic, backoff = some(g.parameters.unsubscribeBackoff))
@@ -647,7 +669,7 @@ method publish*(g: GossipSub,
g.mcache.put(msgId, msg)
g.broadcast(peers, RPCMsg(messages: @[msg]))
g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)
if g.knownTopics.contains(topic):
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])

View File

@@ -530,14 +530,14 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
# Send changes to peers after table updates to avoid stale state
if grafts.len > 0:
let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))
g.broadcast(grafts, graft)
g.broadcast(grafts, graft, isHighPriority = true)
if prunes.len > 0:
let prune = RPCMsg(control: some(ControlMessage(
prune: @[ControlPrune(
topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(prunes, prune)
g.broadcast(prunes, prune, isHighPriority = true)
proc dropFanoutPeers*(g: GossipSub) {.raises: [].} =
# drop peers that we haven't published to in
@@ -669,7 +669,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
topicID: t,
peers: g.peerExchangeList(t),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(prunes, prune)
g.broadcast(prunes, prune, isHighPriority = true)
# pass by ptr in order to both signal we want to update metrics
# and as well update the struct for each topic during this iteration
@@ -691,7 +691,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicId])
else:
libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"])
g.send(peer, RPCMsg(control: some(control)))
g.send(peer, RPCMsg(control: some(control)), isHighPriority = true)
g.mcache.shift() # shift the cache

View File

@@ -138,18 +138,35 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =
libp2p_pubsub_peers.set(p.peers.len.int64)
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [].} =
## Attempt to send `msg` to remote peer
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool, id: MessageId = @[]) {.raises: [].} =
## This procedure attempts to send a `msg` (of type `RPCMsg`) to the specified remote peer in the PubSub network.
##
## Parameters:
## - `p`: The `PubSub` instance.
## - `peer`: An instance of `PubSubPeer` representing the peer to whom the message should be sent.
## - `msg`: The `RPCMsg` instance that contains the message to be sent.
## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority.
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
## priority messages have been sent.
trace "sending pubsub message to peer", peer, msg = shortLog(msg)
peer.send(msg, p.anonymize)
asyncSpawn peer.send(msg, p.anonymize, isHighPriority, id)
proc broadcast*(
p: PubSub,
sendPeers: auto, # Iteratble[PubSubPeer]
msg: RPCMsg) {.raises: [].} =
## Attempt to send `msg` to the given peers
msg: RPCMsg,
isHighPriority: bool,
validMsgId: MessageId = @[]) {.raises: [].} =
## This procedure attempts to send a `msg` (of type `RPCMsg`) to a specified group of peers in the PubSub network.
##
## Parameters:
## - `p`: The `PubSub` instance.
## - `sendPeers`: An iterable of `PubSubPeer` instances representing the peers to whom the message should be sent.
## - `msg`: The `RPCMsg` instance that contains the message to be broadcast.
## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority.
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
## priority messages have been sent.
let npeers = sendPeers.len.int64
for sub in msg.subscriptions:
@@ -195,19 +212,19 @@ proc broadcast*(
if anyIt(sendPeers, it.hasObservers):
for peer in sendPeers:
p.send(peer, msg)
p.send(peer, msg, isHighPriority, validMsgId)
else:
# Fast path that only encodes message once
let encoded = encodeRpcMsg(msg, p.anonymize)
for peer in sendPeers:
asyncSpawn peer.sendEncoded(encoded)
asyncSpawn peer.sendEncoded(encoded, isHighPriority, validMsgId)
proc sendSubs*(p: PubSub,
peer: PubSubPeer,
topics: openArray[string],
subscribe: bool) =
## send subscriptions to remote peer
p.send(peer, RPCMsg.withSubs(topics, subscribe))
p.send(peer, RPCMsg.withSubs(topics, subscribe), isHighPriority = true)
for topic in topics:
if subscribe:

View File

@@ -21,6 +21,8 @@ import rpc/[messages, message, protobuf],
../../protobuf/minprotobuf,
../../utility
#import gossipsub/libp2p_gossipsub_staggerDontWantSave
export peerid, connection, deques
logScope:
@@ -28,10 +30,12 @@ logScope:
when defined(libp2p_expensive_metrics):
declareCounter(libp2p_pubsub_sent_messages, "number of messages sent", labels = ["id", "topic"])
declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id", "topic"])
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"])
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"])
type
PeerRateLimitError* = object of CatchableError
@@ -50,6 +54,18 @@ type
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}
MessageWithId = object
message: seq[byte]
msgId: MessageId
RpcMessageQueue* = ref object
# Tracks async tasks for sending high-priority peer-published messages.
sendPriorityQueue: Deque[Future[void]]
# Queue for lower-priority messages, like "IWANT" replies and relay messages.
nonPriorityQueue: AsyncQueue[MessageWithId]
# Task for processing non-priority message queue.
sendNonPriorityTask: Future[void]
PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection
onEvent*: OnEvent # Connectivity updates for peer
@@ -71,6 +87,8 @@ type
behaviourPenalty*: float64 # the eventual penalty score
overheadRateLimitOpt*: Opt[TokenBucket]
rpcmessagequeue: RpcMessageQueue
RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
{.gcsafe, raises: [].}
@@ -83,6 +101,20 @@ when defined(libp2p_agents_metrics):
#so we have to read the parents short agent..
p.sendConn.getWrapped().shortAgent
proc newMessageWithId(msg: seq[byte], id: MessageId): MessageWithId =
result.message = msg
result.msgId = id
proc getAgent*(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:
"unknown"
func hash*(p: PubSubPeer): Hash =
p.peerId.hash
@@ -138,12 +170,6 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
conn, peer = p, closed = conn.closed,
data = data.shortLog
when defined(libp2p_expensive_metrics):
for m in rmsg.messages:
for t in m.topicIDs:
# metrics
libp2p_pubsub_received_messages.inc(labelValues = [$p.peerId, t])
await p.handler(p, data)
data = newSeq[byte]() # Release memory
except PeerRateLimitError as exc:
@@ -230,21 +256,17 @@ proc hasSendConn*(p: PubSubPeer): bool =
template sendMetrics(msg: RPCMsg): untyped =
when defined(libp2p_expensive_metrics):
for x in msg.messages:
for t in x.topicIDs:
for t in x.topicIds:
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} =
doAssert(not isNil(p), "pubsubpeer nil!")
if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
return
if msg.len > p.maxMessageSize:
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
return
proc clearSendPriorityQueue(p: PubSubPeer) =
while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished:
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
discard p.rpcmessagequeue.sendPriorityQueue.popFirst()
proc sendMsg(p: PubSubPeer, msg: seq[byte], msgId: MessageId) {.async.} =
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed
@@ -257,6 +279,12 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} =
trace "sending encoded msgs to peer", conn, encoded = shortLog(msg)
if msgId.len > 0:
for dontWants in p.heDontWants:
if msgId in dontWants:
#libp2p_gossipsub_staggerDontWantSave.inc()
trace "Skipped sending msg/dontwant received from peer", conn, encoded = shortLog(msg)
return
try:
await conn.writeLp(msg)
trace "sent pubsub message to remote", conn
@@ -269,6 +297,38 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} =
await conn.close() # This will clean up the send connection
proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool, validMsgId: MessageId = @[]) {.async.} =
## Asynchronously sends an encoded message to a specified `PubSubPeer`.
##
## Parameters:
## - `p`: The `PubSubPeer` instance to which the message is to be sent.
## - `msg`: The message to be sent, encoded as a sequence of bytes (`seq[byte]`).
## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority.
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
## priority messages have been sent.
doAssert(not isNil(p), "pubsubpeer nil!")
if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
return
if msg.len > p.maxMessageSize:
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
return
if isHighPriority:
p.clearSendPriorityQueue()
let f = p.sendMsg(msg, validMsgId)
if not f.finished:
p.rpcmessagequeue.sendPriorityQueue.addLast(f)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
else:
await p.rpcmessagequeue.nonPriorityQueue.addLast(newMessageWithId(msg, validMsgId))
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
trace "message queued", p, msg = shortLog(msg)
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
## Each new `RPCMsg` accumulates Messages until reaching the specified `maxSize`. If a single Message
@@ -304,7 +364,16 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
else:
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool, validMsgId: MessageId = @[]) {.async.} =
## Asynchronously sends an `RPCMsg` to a specified `PubSubPeer` with an option for anonymization.
##
## Parameters:
## - `p`: The `PubSubPeer` instance to which the message is to be sent.
## - `msg`: The `RPCMsg` instance representing the message to be sent.
## - `anonymize`: A boolean flag indicating whether the message should be sent with anonymization.
## - `isHighPriority`: A boolean flag indicating whether the message should be treated as high priority.
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
## priority messages have been sent.
# When sending messages, we take care to re-encode them with the right
# anonymization flag to ensure that we're not penalized for sending invalid
# or malicious data on the wire - in particular, re-encoding protects against
@@ -324,11 +393,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
if encoded.len > p.maxMessageSize and msg.messages.len > 1:
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
asyncSpawn p.sendEncoded(encodedSplitMsg)
await p.sendEncoded(encodedSplitMsg, isHighPriority, validMsgId)
else:
# If the message size is within limits, send it as is
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
asyncSpawn p.sendEncoded(encoded)
await p.sendEncoded(encoded, isHighPriority, validMsgId)
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
for sentIHave in p.sentIHaves.mitems():
@@ -337,6 +406,43 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
return true
return false
proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
while true:
# we send non-priority messages only if there are no pending priority messages
let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
while p.rpcmessagequeue.sendPriorityQueue.len > 0:
p.clearSendPriorityQueue()
# this minimizes the number of times we have to wait for something (each wait = performance cost)
# we will never wait for a finished future and by waiting for the last one, all that come before it are guaranteed
# to be finished already (since sends are processed in order).
if p.rpcmessagequeue.sendPriorityQueue.len > 0:
await p.rpcmessagequeue.sendPriorityQueue[^1]
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
await p.sendMsg(msg.message, msg.msgId)
proc startSendNonPriorityTask(p: PubSubPeer) =
debug "starting sendNonPriorityTask", p
if p.rpcmessagequeue.sendNonPriorityTask.isNil:
p.rpcmessagequeue.sendNonPriorityTask = p.sendNonPriorityTask()
proc stopSendNonPriorityTask*(p: PubSubPeer) =
if not p.rpcmessagequeue.sendNonPriorityTask.isNil:
debug "stopping sendNonPriorityTask", p
p.rpcmessagequeue.sendNonPriorityTask.cancel()
p.rpcmessagequeue.sendNonPriorityTask = nil
p.rpcmessagequeue.sendPriorityQueue.clear()
p.rpcmessagequeue.nonPriorityQueue.clear()
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
proc new(T: typedesc[RpcMessageQueue]): T =
return T(
sendPriorityQueue: initDeque[Future[void]](),
nonPriorityQueue: newAsyncQueue[MessageWithId](),
)
proc new*(
T: typedesc[PubSubPeer],
peerId: PeerId,
@@ -353,17 +459,9 @@ proc new*(
peerId: peerId,
connectedFut: newFuture[void](),
maxMessageSize: maxMessageSize,
overheadRateLimitOpt: overheadRateLimitOpt
overheadRateLimitOpt: overheadRateLimitOpt,
rpcmessagequeue: RpcMessageQueue.new(),
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.heDontWants.addFirst(default(HashSet[MessageId]))
proc getAgent*(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:
"unknown"
result.startSendNonPriorityTask()

View File

@@ -89,6 +89,7 @@ build_target() {
mkdir "$CACHE_DIR"
cp -a "$TARGET_DIR"/* "$CACHE_DIR"/
fi
echo "Binary built successfully."
}
if target_needs_rebuilding; then

View File

@@ -1,6 +1,7 @@
{.push raises: [].}
import chronos
import macros
import algorithm
import ../libp2p/transports/tcptransport
@@ -110,19 +111,83 @@ proc bridgedConnections*: (Connection, Connection) =
await connA.pushData(data)
return (connA, connB)
proc checkExpiringInternal(cond: proc(): bool {.raises: [], gcsafe.} ): Future[bool] {.async.} =
let start = Moment.now()
while true:
if Moment.now() > (start + chronos.seconds(5)):
return false
elif cond():
return true
macro checkUntilCustomTimeout*(timeout: Duration, code: untyped): untyped =
## Periodically checks a given condition until it is true or a timeout occurs.
##
## `code`: untyped - A condition expression that should eventually evaluate to true.
## `timeout`: Duration - The maximum duration to wait for the condition to be true.
##
## Examples:
## ```nim
## # Example 1:
## asyncTest "checkUntilCustomTimeout should pass if the condition is true":
## let a = 2
## let b = 2
## checkUntilCustomTimeout(2.seconds):
## a == b
##
## # Example 2: Multiple conditions
## asyncTest "checkUntilCustomTimeout should pass if the conditions are true":
## let a = 2
## let b = 2
## checkUntilCustomTimeout(5.seconds)::
## a == b
## a == 2
## b == 1
## ```
# Helper proc to recursively build a combined boolean expression
proc buildAndExpr(n: NimNode): NimNode =
if n.kind == nnkStmtList and n.len > 0:
var combinedExpr = n[0] # Start with the first expression
for i in 1..<n.len:
# Combine the current expression with the next using 'and'
combinedExpr = newCall("and", combinedExpr, n[i])
return combinedExpr
else:
await sleepAsync(1.millis)
return n
template checkExpiring*(code: untyped): untyped =
check await checkExpiringInternal(proc(): bool = code)
# Build the combined expression
let combinedBoolExpr = buildAndExpr(code)
result = quote do:
proc checkExpiringInternal(): Future[void] {.gensym, async.} =
let start = Moment.now()
while true:
if Moment.now() > (start + `timeout`):
checkpoint("[TIMEOUT] Timeout was reached and the conditions were not true. Check if the code is working as " &
"expected or consider increasing the timeout param.")
check `code`
return
else:
if `combinedBoolExpr`:
return
else:
await sleepAsync(1.millis)
await checkExpiringInternal()
macro checkUntilTimeout*(code: untyped): untyped =
## Same as `checkUntilCustomTimeout` but with a default timeout of 10 seconds.
##
## Examples:
## ```nim
## # Example 1:
## asyncTest "checkUntilTimeout should pass if the condition is true":
## let a = 2
## let b = 2
## checkUntilTimeout:
## a == b
##
## # Example 2: Multiple conditions
## asyncTest "checkUntilTimeout should pass if the conditions are true":
## let a = 2
## let b = 2
## checkUntilTimeout:
## a == b
## a == 2
## b == 1
## ```
result = quote do:
checkUntilCustomTimeout(10.seconds, `code`)
proc unorderedCompare*[T](a, b: seq[T]): bool =
if a == b:

View File

@@ -1,5 +1,5 @@
# syntax=docker/dockerfile:1.5-labs
FROM nimlang/nim:1.6.14 as builder
FROM nimlang/nim:1.6.16 as builder
WORKDIR /workspace

View File

@@ -361,7 +361,7 @@ suite "FloodSub":
check (await smallNode[0].publish("foo", smallMessage1)) > 0
check (await bigNode[0].publish("foo", smallMessage2)) > 0
checkExpiring: messageReceived == 2
checkUntilTimeout: messageReceived == 2
check (await smallNode[0].publish("foo", bigMessage)) > 0
check (await bigNode[0].publish("foo", bigMessage)) > 0
@@ -396,7 +396,7 @@ suite "FloodSub":
check (await bigNode1[0].publish("foo", bigMessage)) > 0
checkExpiring: messageReceived == 1
checkUntilTimeout: messageReceived == 1
await allFuturesThrowing(
bigNode1[0].switch.stop(),

View File

@@ -779,9 +779,9 @@ suite "GossipSub internal":
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
ihave: @[ControlIHave(topicId: "foobar", messageIds: iwantMessageIds)]
))))
))), isHighPriority = false)
checkExpiring: receivedMessages[] == sentMessages
checkUntilTimeout: receivedMessages[] == sentMessages
check receivedMessages[].len == 2
await teardownTest(gossip0, gossip1)
@@ -796,10 +796,10 @@ suite "GossipSub internal":
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
))))
))), isHighPriority = false)
await sleepAsync(300.milliseconds)
checkExpiring: receivedMessages[].len == 0
checkUntilTimeout: receivedMessages[].len == 0
await teardownTest(gossip0, gossip1)
@@ -813,9 +813,9 @@ suite "GossipSub internal":
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
))))
))), isHighPriority = false)
checkExpiring: receivedMessages[] == sentMessages
checkUntilTimeout: receivedMessages[] == sentMessages
check receivedMessages[].len == 2
await teardownTest(gossip0, gossip1)
@@ -831,7 +831,7 @@ suite "GossipSub internal":
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
))))
))), isHighPriority = false)
var smallestSet: HashSet[seq[byte]]
let seqs = toSeq(sentMessages)
@@ -840,7 +840,7 @@ suite "GossipSub internal":
else:
smallestSet.incl(seqs[1])
checkExpiring: receivedMessages[] == smallestSet
checkUntilTimeout: receivedMessages[] == smallestSet
check receivedMessages[].len == 1
await teardownTest(gossip0, gossip1)

View File

@@ -310,9 +310,9 @@ suite "GossipSub":
let gossip1 = GossipSub(nodes[0])
let gossip2 = GossipSub(nodes[1])
checkExpiring:
"foobar" in gossip2.topics and
"foobar" in gossip1.gossipsub and
checkUntilTimeout:
"foobar" in gossip2.topics
"foobar" in gossip1.gossipsub
gossip1.gossipsub.hasPeerId("foobar", gossip2.peerInfo.peerId)
await allFuturesThrowing(
@@ -454,9 +454,9 @@ suite "GossipSub":
nodes[1].subscribe("foobar", handler)
let gsNode = GossipSub(nodes[1])
checkExpiring:
gsNode.mesh.getOrDefault("foobar").len == 0 and
GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0 and
checkUntilTimeout:
gsNode.mesh.getOrDefault("foobar").len == 0
GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0
(
GossipSub(nodes[0]).gossipsub.getOrDefault("foobar").len == 1 or
GossipSub(nodes[0]).fanout.getOrDefault("foobar").len == 1
@@ -572,16 +572,16 @@ suite "GossipSub":
gossip1.seen = TimedCache[MessageId].init()
gossip3.seen = TimedCache[MessageId].init()
let msgId = toSeq(gossip2.validationSeen.keys)[0]
checkExpiring(try: gossip2.validationSeen[msgId].len > 0 except: false)
checkUntilTimeout(try: gossip2.validationSeen[msgId].len > 0 except: false)
result = ValidationResult.Accept
bFinished.complete()
nodes[1].addValidator("foobar", slowValidator)
checkExpiring(
gossip1.mesh.getOrDefault("foobar").len == 2 and
gossip2.mesh.getOrDefault("foobar").len == 2 and
gossip3.mesh.getOrDefault("foobar").len == 2)
checkUntilTimeout:
gossip1.mesh.getOrDefault("foobar").len == 2
gossip2.mesh.getOrDefault("foobar").len == 2
gossip3.mesh.getOrDefault("foobar").len == 2
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 2
await bFinished
@@ -676,7 +676,7 @@ suite "GossipSub":
# Now try with a mesh
gossip1.subscribe("foobar", handler)
checkExpiring: gossip1.mesh.peers("foobar") > 5
checkUntilTimeout: gossip1.mesh.peers("foobar") > 5
# use a different length so that the message is not equal to the last
check (await nodes[0].publish("foobar", newSeq[byte](500_000))) == numPeersSecondMsg
@@ -912,14 +912,14 @@ suite "GossipSub":
gossip3.broadcast(gossip3.mesh["foobar"], RPCMsg(control: some(ControlMessage(
idontwant: @[ControlIWant(messageIds: @[newSeq[byte](10)])]
))))
checkExpiring: gossip2.mesh.getOrDefault("foobar").anyIt(it.heDontWants[^1].len == 1)
))), isHighPriority = true)
checkUntilTimeout: gossip2.mesh.getOrDefault("foobar").anyIt(it.heDontWants[^1].len == 1)
tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1
await bFinished
checkExpiring: toSeq(gossip3.mesh.getOrDefault("foobar")).anyIt(it.heDontWants[^1].len == 1)
checkUntilTimeout: toSeq(gossip3.mesh.getOrDefault("foobar")).anyIt(it.heDontWants[^1].len == 1)
check: toSeq(gossip1.mesh.getOrDefault("foobar")).anyIt(it.heDontWants[^1].len == 0)
await allFuturesThrowing(
@@ -968,7 +968,10 @@ suite "GossipSub":
let rateLimitHits = currentRateLimitHits()
let (nodes, gossip0, gossip1) = await initializeGossipTest()
gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))]))
gossip0.broadcast(
gossip0.mesh["foobar"],
RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))]),
isHighPriority = true)
await sleepAsync(300.millis)
check currentRateLimitHits() == rateLimitHits
@@ -976,7 +979,10 @@ suite "GossipSub":
# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))]))
gossip0.broadcast(
gossip0.mesh["foobar"],
RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))]),
isHighPriority = true)
await sleepAsync(300.millis)
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
@@ -990,7 +996,7 @@ suite "GossipSub":
let (nodes, gossip0, gossip1) = await initializeGossipTest()
# Simulate sending an undecodable message
await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](33, 1.byte))
await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](33, 1.byte), isHighPriority = true)
await sleepAsync(300.millis)
check currentRateLimitHits() == rateLimitHits + 1
@@ -998,9 +1004,9 @@ suite "GossipSub":
# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](35, 1.byte))
await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](35, 1.byte), isHighPriority = true)
checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2
await stopNodes(nodes)
@@ -1014,7 +1020,7 @@ suite "GossipSub":
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](33)))
], backoff: 123'u64)
])))
gossip0.broadcast(gossip0.mesh["foobar"], msg)
gossip0.broadcast(gossip0.mesh["foobar"], msg, isHighPriority = true)
await sleepAsync(300.millis)
check currentRateLimitHits() == rateLimitHits + 1
@@ -1027,9 +1033,9 @@ suite "GossipSub":
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](35)))
], backoff: 123'u64)
])))
gossip0.broadcast(gossip0.mesh["foobar"], msg2)
gossip0.broadcast(gossip0.mesh["foobar"], msg2, isHighPriority = true)
checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2
await stopNodes(nodes)
@@ -1049,7 +1055,7 @@ suite "GossipSub":
let msg = RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](40))])
gossip0.broadcast(gossip0.mesh[topic], msg)
gossip0.broadcast(gossip0.mesh[topic], msg, isHighPriority = true)
await sleepAsync(300.millis)
check currentRateLimitHits() == rateLimitHits + 1
@@ -1057,9 +1063,12 @@ suite "GossipSub":
# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
gossip0.broadcast(gossip0.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]))
gossip0.broadcast(
gossip0.mesh[topic],
RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]),
isHighPriority = true)
checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2
await stopNodes(nodes)

View File

@@ -215,7 +215,7 @@ suite "Connection Manager":
await connMngr.close()
checkExpiring: waitedConn3.cancelled()
checkUntilTimeout: waitedConn3.cancelled()
await allFuturesThrowing(
allFutures(muxs.mapIt( it.close() )))
@@ -231,7 +231,7 @@ suite "Connection Manager":
await muxer.close()
checkExpiring: muxer notin connMngr
checkUntilTimeout: muxer notin connMngr
await connMngr.close()
@@ -254,7 +254,7 @@ suite "Connection Manager":
check peerId in connMngr
await connMngr.dropPeer(peerId)
checkExpiring: peerId notin connMngr
checkUntilTimeout: peerId notin connMngr
check isNil(connMngr.selectMuxer(peerId, Direction.In))
check isNil(connMngr.selectMuxer(peerId, Direction.Out))

View File

@@ -64,7 +64,7 @@ suite "Dcutr":
await DcutrClient.new().startSync(behindNATSwitch, publicSwitch.peerInfo.peerId, behindNATSwitch.peerInfo.addrs)
.wait(300.millis)
checkExpiring:
checkUntilTimeout:
# we still expect a new connection to be open by the receiver peer acting as the dcutr server
behindNATSwitch.connManager.connCount(publicSwitch.peerInfo.peerId) == 2
@@ -83,7 +83,7 @@ suite "Dcutr":
body
checkExpiring:
checkUntilTimeout:
# we still expect a new connection to be open by the receiver peer acting as the dcutr server
behindNATSwitch.connManager.connCount(publicSwitch.peerInfo.peerId) == 2
@@ -150,7 +150,7 @@ suite "Dcutr":
await DcutrClient.new().startSync(behindNATSwitch, publicSwitch.peerInfo.peerId, behindNATSwitch.peerInfo.addrs)
.wait(300.millis)
checkExpiring:
checkUntilTimeout:
# we still expect a new connection to be open by the receiver peer acting as the dcutr server
behindNATSwitch.connManager.connCount(publicSwitch.peerInfo.peerId) == 1

42
tests/testhelpers.nim Normal file
View File

@@ -0,0 +1,42 @@
{.used.}
# Nim-Libp2p
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import ./helpers
suite "Helpers":
asyncTest "checkUntilTimeout should pass if the condition is true":
let a = 2
let b = 2
checkUntilTimeout:
a == b
asyncTest "checkUntilTimeout should pass if the conditions are true":
let a = 2
let b = 2
checkUntilTimeout:
a == b
a == 2
b == 2
asyncTest "checkUntilCustomTimeout should pass when the condition is true":
let a = 2
let b = 2
checkUntilCustomTimeout(2.seconds):
a == b
asyncTest "checkUntilCustomTimeout should pass when the conditions are true":
let a = 2
let b = 2
checkUntilCustomTimeout(5.seconds):
a == b
a == 2
b == 2

View File

@@ -89,8 +89,8 @@ suite "Hole Punching":
await publicPeerSwitch.connect(privatePeerSwitch.peerInfo.peerId, (await privatePeerRelayAddr))
checkExpiring:
privatePeerSwitch.connManager.connCount(publicPeerSwitch.peerInfo.peerId) == 1 and
checkUntilTimeout:
privatePeerSwitch.connManager.connCount(publicPeerSwitch.peerInfo.peerId) == 1
not isRelayed(privatePeerSwitch.connManager.selectMuxer(publicPeerSwitch.peerInfo.peerId).connection)
await allFuturesThrowing(
@@ -127,8 +127,8 @@ suite "Hole Punching":
await publicPeerSwitch.connect(privatePeerSwitch.peerInfo.peerId, (await privatePeerRelayAddr))
checkExpiring:
privatePeerSwitch.connManager.connCount(publicPeerSwitch.peerInfo.peerId) == 1 and
checkUntilTimeout:
privatePeerSwitch.connManager.connCount(publicPeerSwitch.peerInfo.peerId) == 1
not isRelayed(privatePeerSwitch.connManager.selectMuxer(publicPeerSwitch.peerInfo.peerId).connection)
await allFuturesThrowing(

View File

@@ -219,8 +219,8 @@ suite "Identify":
await identifyPush2.push(switch2.peerInfo, conn)
checkExpiring: switch1.peerStore[ProtoBook][switch2.peerInfo.peerId] == switch2.peerInfo.protocols
checkExpiring: switch1.peerStore[AddressBook][switch2.peerInfo.peerId] == switch2.peerInfo.addrs
checkUntilTimeout: switch1.peerStore[ProtoBook][switch2.peerInfo.peerId] == switch2.peerInfo.protocols
checkUntilTimeout: switch1.peerStore[AddressBook][switch2.peerInfo.peerId] == switch2.peerInfo.addrs
await closeAll()

View File

@@ -829,7 +829,7 @@ suite "Mplex":
check:
unorderedCompare(dialStreams, mplexDial.getStreams())
checkExpiring: listenStreams.len == 10 and dialStreams.len == 10
checkUntilTimeout: listenStreams.len == 10 and dialStreams.len == 10
await mplexListen.close()
await allFuturesThrowing(
@@ -876,7 +876,7 @@ suite "Mplex":
check:
unorderedCompare(dialStreams, mplexDial.getStreams())
checkExpiring: listenStreams.len == 10 and dialStreams.len == 10
checkUntilTimeout: listenStreams.len == 10 and dialStreams.len == 10
mplexHandle.cancel()
await allFuturesThrowing(
@@ -920,7 +920,7 @@ suite "Mplex":
check:
unorderedCompare(dialStreams, mplexDial.getStreams())
checkExpiring: listenStreams.len == 10 and dialStreams.len == 10
checkUntilTimeout: listenStreams.len == 10 and dialStreams.len == 10
await conn.close()
await allFuturesThrowing(
@@ -967,7 +967,7 @@ suite "Mplex":
check:
unorderedCompare(dialStreams, mplexDial.getStreams())
checkExpiring: listenStreams.len == 10 and dialStreams.len == 10
checkUntilTimeout: listenStreams.len == 10 and dialStreams.len == 10
await listenConn.closeWithEOF()
await allFuturesThrowing(

View File

@@ -57,4 +57,5 @@ import testtcptransport,
testautorelay,
testdcutr,
testhpservice,
testutility
testutility,
testhelpers

View File

@@ -62,8 +62,8 @@ suite "RendezVous Interface":
dm.advertise(RdvNamespace("ns1"))
dm.advertise(RdvNamespace("ns2"))
checkExpiring: rdv.numAdvertiseNs1 >= 5
checkExpiring: rdv.numAdvertiseNs2 >= 5
checkUntilTimeout: rdv.numAdvertiseNs1 >= 5
checkUntilTimeout: rdv.numAdvertiseNs2 >= 5
await client.stop()
asyncTest "Check timeToAdvertise interval":

View File

@@ -283,12 +283,12 @@ suite "Switch":
await switch2.disconnect(switch1.peerInfo.peerId)
check not switch2.isConnected(switch1.peerInfo.peerId)
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
checkUntilTimeout: not switch1.isConnected(switch2.peerInfo.peerId)
checkTracker(LPChannelTrackerName)
checkTracker(SecureConnTrackerName)
checkExpiring:
checkUntilTimeout:
startCounts ==
@[
switch1.connManager.inSema.count, switch1.connManager.outSema.count,
@@ -336,7 +336,7 @@ suite "Switch":
await switch2.disconnect(switch1.peerInfo.peerId)
check not switch2.isConnected(switch1.peerInfo.peerId)
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
checkUntilTimeout: not switch1.isConnected(switch2.peerInfo.peerId)
checkTracker(LPChannelTrackerName)
checkTracker(SecureConnTrackerName)
@@ -388,7 +388,7 @@ suite "Switch":
await switch2.disconnect(switch1.peerInfo.peerId)
check not switch2.isConnected(switch1.peerInfo.peerId)
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
checkUntilTimeout: not switch1.isConnected(switch2.peerInfo.peerId)
checkTracker(LPChannelTrackerName)
checkTracker(SecureConnTrackerName)
@@ -439,7 +439,7 @@ suite "Switch":
await switch2.disconnect(switch1.peerInfo.peerId)
check not switch2.isConnected(switch1.peerInfo.peerId)
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
checkUntilTimeout: not switch1.isConnected(switch2.peerInfo.peerId)
checkTracker(LPChannelTrackerName)
checkTracker(SecureConnTrackerName)
@@ -490,7 +490,7 @@ suite "Switch":
await switch2.disconnect(switch1.peerInfo.peerId)
check not switch2.isConnected(switch1.peerInfo.peerId)
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
checkUntilTimeout: not switch1.isConnected(switch2.peerInfo.peerId)
checkTracker(LPChannelTrackerName)
checkTracker(SecureConnTrackerName)
@@ -554,8 +554,8 @@ suite "Switch":
check not switch2.isConnected(switch1.peerInfo.peerId)
check not switch3.isConnected(switch1.peerInfo.peerId)
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
checkExpiring: not switch1.isConnected(switch3.peerInfo.peerId)
checkUntilTimeout: not switch1.isConnected(switch2.peerInfo.peerId)
checkUntilTimeout: not switch1.isConnected(switch3.peerInfo.peerId)
checkTracker(LPChannelTrackerName)
checkTracker(SecureConnTrackerName)
@@ -711,7 +711,7 @@ suite "Switch":
await allFuturesThrowing(readers)
await switch2.stop() #Otherwise this leaks
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
checkUntilTimeout: not switch1.isConnected(switch2.peerInfo.peerId)
checkTracker(LPChannelTrackerName)
checkTracker(SecureConnTrackerName)

View File

@@ -36,8 +36,8 @@ suite "Yamux":
yamuxa.close(), yamuxb.close(),
handlera, handlerb)
suite "Basic":
asyncTest "Simple test":
suite "Simple Reading/Writing yamux messages":
asyncTest "Roundtrip of small messages":
mSetup()
yamuxb.streamHandler = proc(conn: Connection) {.async.} =