mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 11:48:15 -05:00
Compare commits
11 Commits
dev/etan/z
...
limit_forw
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3fb83b8672 | ||
|
|
fe4ff79885 | ||
|
|
aa4ebb0b3c | ||
|
|
e0f70b7177 | ||
|
|
c1dfd58772 | ||
|
|
04af0c4323 | ||
|
|
eb0890cd6f | ||
|
|
9bc5ec1566 | ||
|
|
5594bcb33e | ||
|
|
d46bcdb6ac | ||
|
|
9468bb6b4d |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -16,3 +16,4 @@ tests/pubsub/testgossipsub
|
||||
examples/*.md
|
||||
nimble.develop
|
||||
nimble.paths
|
||||
go-libp2p-daemon/
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
- [Background](#background)
|
||||
- [Install](#install)
|
||||
- [Getting Started](#getting-started)
|
||||
- [Go-libp2p-daemon](#go-libp2p-daemon)
|
||||
- [Modules](#modules)
|
||||
- [Users](#users)
|
||||
- [Stability](#stability)
|
||||
@@ -40,6 +41,8 @@ Learn more about libp2p at [**libp2p.io**](https://libp2p.io) and follow libp2p'
|
||||
## Install
|
||||
**Prerequisite**
|
||||
- [Nim](https://nim-lang.org/install.html)
|
||||
> The currently supported Nim version is 1.6.18.
|
||||
|
||||
```
|
||||
nimble install libp2p
|
||||
```
|
||||
@@ -47,11 +50,11 @@ nimble install libp2p
|
||||
## Getting Started
|
||||
You'll find the nim-libp2p documentation [here](https://status-im.github.io/nim-libp2p/docs/).
|
||||
|
||||
**Go Daemon:**
|
||||
Please find the installation and usage intructions in [daemonapi.md](examples/go-daemon/daemonapi.md).
|
||||
### Testing
|
||||
Remember you'll need to build the `go-libp2p-daemon` binary to run the `nim-libp2p` tests.
|
||||
To do so, please follow the installation instructions in [daemonapi.md](examples/go-daemon/daemonapi.md).
|
||||
|
||||
## Modules
|
||||
|
||||
List of packages modules implemented in nim-libp2p:
|
||||
|
||||
| Name | Description |
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
# Table of Contents
|
||||
- [Introduction](#introduction)
|
||||
- [Prerequisites](#prerequisites)
|
||||
- [Installation](#installation)
|
||||
- [Script](#script)
|
||||
- [Usage](#usage)
|
||||
- [Example](#example)
|
||||
- [Getting Started](#getting-started)
|
||||
@@ -8,26 +10,29 @@
|
||||
# Introduction
|
||||
This is a libp2p-backed daemon wrapping the functionalities of go-libp2p for use in Nim. <br>
|
||||
For more information about the go daemon, check out [this repository](https://github.com/libp2p/go-libp2p-daemon).
|
||||
> **Required only** for running the tests.
|
||||
|
||||
# Prerequisites
|
||||
Go with version `1.15.15`.
|
||||
> You will *likely* be able to build `go-libp2p-daemon` with different Go versions, but **they haven't been tested**.
|
||||
|
||||
# Installation
|
||||
Follow one of the methods below:
|
||||
|
||||
## Script
|
||||
Run the build script while having the `go` command pointing to the correct Go version.
|
||||
We recommend using `1.15.15`, as previously stated.
|
||||
```sh
|
||||
# clone and install dependencies
|
||||
git clone https://github.com/status-im/nim-libp2p
|
||||
cd nim-libp2p
|
||||
nimble install
|
||||
|
||||
# perform unit tests
|
||||
nimble test
|
||||
|
||||
# update the git submodule to install the go daemon
|
||||
git submodule update --init --recursive
|
||||
go version
|
||||
git clone https://github.com/libp2p/go-libp2p-daemon
|
||||
cd go-libp2p-daemon
|
||||
git checkout v0.0.1
|
||||
go install ./...
|
||||
cd ..
|
||||
./scripts/build_p2pd.sh
|
||||
```
|
||||
If everything goes correctly, the binary (`p2pd`) should be built and placed in the correct directory.
|
||||
If you find any issues, please head into our discord and ask for our asistance.
|
||||
|
||||
After successfully building the binary, remember to add it to your path so it can be found. You can do that by running:
|
||||
```sh
|
||||
export PATH="$PATH:$HOME/go/bin"
|
||||
```
|
||||
> **Tip:** To make this change permanent, add the command above to your `.bashrc` file.
|
||||
|
||||
# Usage
|
||||
|
||||
|
||||
@@ -29,9 +29,9 @@ const
|
||||
when defined(libp2p_yamux_metrics):
|
||||
declareGauge(libp2p_yamux_channels, "yamux channels", labels = ["initiator", "peer"])
|
||||
declareHistogram libp2p_yamux_send_queue, "message send queue length (in byte)",
|
||||
buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 1600.0, 6400.0, 25600.0, 256000.0]
|
||||
buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 3200.0, 6400.0, 25600.0, 256000.0]
|
||||
declareHistogram libp2p_yamux_recv_queue, "message recv queue length (in byte)",
|
||||
buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 1600.0, 6400.0, 25600.0, 256000.0]
|
||||
buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 3200.0, 6400.0, 25600.0, 256000.0]
|
||||
|
||||
type
|
||||
YamuxError* = object of CatchableError
|
||||
@@ -172,12 +172,16 @@ proc `$`(channel: YamuxChannel): string =
|
||||
result &= " {" & s.foldl(if a != "": a & ", " & b else: b, "") & "}"
|
||||
|
||||
proc lengthSendQueue(channel: YamuxChannel): int =
|
||||
## Returns the length of what remains to be sent
|
||||
##
|
||||
channel.sendQueue.foldl(a + b.data.len - b.sent, 0)
|
||||
|
||||
proc lengthSendQueueWithLimit(channel: YamuxChannel): int =
|
||||
## Returns the length of what remains to be sent, but limit the size of big messages.
|
||||
##
|
||||
# For leniency, limit big messages size to the third of maxSendQueueSize
|
||||
# This value is arbitrary, it's not in the specs
|
||||
# It permits to store up to 3 big messages if the peer is stalling.
|
||||
# This value is arbitrary, it's not in the specs, it permits to store up to
|
||||
# 3 big messages if the peer is stalling.
|
||||
channel.sendQueue.foldl(a + min(b.data.len - b.sent, channel.maxSendQueueSize div 3), 0)
|
||||
|
||||
proc actuallyClose(channel: YamuxChannel) {.async.} =
|
||||
@@ -200,6 +204,9 @@ method closeImpl*(channel: YamuxChannel) {.async.} =
|
||||
await channel.actuallyClose()
|
||||
|
||||
proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} =
|
||||
# If we reset locally, we want to flush up to a maximum of recvWindow
|
||||
# bytes. It's because the peer we're connected to can send us data before
|
||||
# it receives the reset.
|
||||
if channel.isReset:
|
||||
return
|
||||
trace "Reset channel"
|
||||
@@ -220,11 +227,14 @@ proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} =
|
||||
await channel.remoteClosed()
|
||||
channel.receivedData.fire()
|
||||
if not isLocal:
|
||||
# If we reset locally, we want to flush up to a maximum of recvWindow
|
||||
# bytes. We use the recvWindow in the proc cleanupChann.
|
||||
# If the reset is remote, there's no reason to flush anything.
|
||||
channel.recvWindow = 0
|
||||
|
||||
proc updateRecvWindow(channel: YamuxChannel) {.async.} =
|
||||
## Send to the peer a window update when the recvWindow is empty enough
|
||||
##
|
||||
# In order to avoid spamming a window update everytime a byte is read,
|
||||
# we send it everytime half of the maxRecvWindow is read.
|
||||
let inWindow = channel.recvWindow + channel.recvQueue.len
|
||||
if inWindow > channel.maxRecvWindow div 2:
|
||||
return
|
||||
@@ -242,6 +252,7 @@ method readOnce*(
|
||||
pbytes: pointer,
|
||||
nbytes: int):
|
||||
Future[int] {.async.} =
|
||||
## Read from a yamux channel
|
||||
|
||||
if channel.isReset:
|
||||
raise if channel.remoteReset:
|
||||
@@ -287,17 +298,18 @@ proc trySend(channel: YamuxChannel) {.async.} =
|
||||
return
|
||||
channel.isSending = true
|
||||
defer: channel.isSending = false
|
||||
|
||||
while channel.sendQueue.len != 0:
|
||||
channel.sendQueue.keepItIf(not (it.fut.cancelled() and it.sent == 0))
|
||||
if channel.sendWindow == 0:
|
||||
trace "send window empty"
|
||||
trace "trying to send while the sendWindow is empty"
|
||||
if channel.lengthSendQueueWithLimit() > channel.maxSendQueueSize:
|
||||
debug "channel send queue too big, resetting", maxSendQueueSize=channel.maxSendQueueSize,
|
||||
trace "channel send queue too big, resetting", maxSendQueueSize=channel.maxSendQueueSize,
|
||||
currentQueueSize = channel.lengthSendQueueWithLimit()
|
||||
try:
|
||||
await channel.reset(true)
|
||||
except CatchableError as exc:
|
||||
debug "failed to reset", msg=exc.msg
|
||||
warn "failed to reset", msg=exc.msg
|
||||
break
|
||||
|
||||
let
|
||||
@@ -316,20 +328,24 @@ proc trySend(channel: YamuxChannel) {.async.} =
|
||||
|
||||
var futures: seq[Future[void]]
|
||||
while inBuffer < toSend:
|
||||
# concatenate the different message we try to send into one buffer
|
||||
let (data, sent, fut) = channel.sendQueue[0]
|
||||
let bufferToSend = min(data.len - sent, toSend - inBuffer)
|
||||
sendBuffer.toOpenArray(12, 12 + toSend - 1)[inBuffer..<(inBuffer+bufferToSend)] =
|
||||
channel.sendQueue[0].data.toOpenArray(sent, sent + bufferToSend - 1)
|
||||
channel.sendQueue[0].sent.inc(bufferToSend)
|
||||
if channel.sendQueue[0].sent >= data.len:
|
||||
# if every byte of the message is in the buffer, add the write future to the
|
||||
# sequence of futures to be completed (or failed) when the buffer is sent
|
||||
futures.add(fut)
|
||||
channel.sendQueue.delete(0)
|
||||
inBuffer.inc(bufferToSend)
|
||||
|
||||
trace "build send buffer", h = $header, msg=string.fromBytes(sendBuffer[12..^1])
|
||||
trace "try to send the buffer", h = $header
|
||||
channel.sendWindow.dec(toSend)
|
||||
try: await channel.conn.write(sendBuffer)
|
||||
except CatchableError as exc:
|
||||
trace "failed to send the buffer"
|
||||
let connDown = newLPStreamConnDownError(exc)
|
||||
for fut in futures.items():
|
||||
fut.fail(connDown)
|
||||
@@ -340,6 +356,8 @@ proc trySend(channel: YamuxChannel) {.async.} =
|
||||
channel.activity = true
|
||||
|
||||
method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] =
|
||||
## Write to yamux channel
|
||||
##
|
||||
result = newFuture[void]("Yamux Send")
|
||||
if channel.remoteReset:
|
||||
result.fail(newLPStreamResetError())
|
||||
@@ -352,10 +370,12 @@ method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] =
|
||||
return result
|
||||
channel.sendQueue.add((msg, 0, result))
|
||||
when defined(libp2p_yamux_metrics):
|
||||
libp2p_yamux_recv_queue.observe(channel.lengthSendQueue().int64)
|
||||
libp2p_yamux_send_queue.observe(channel.lengthSendQueue().int64)
|
||||
asyncSpawn channel.trySend()
|
||||
|
||||
proc open*(channel: YamuxChannel) {.async.} =
|
||||
proc open(channel: YamuxChannel) {.async.} =
|
||||
## Open a yamux channel by sending a window update with Syn or Ack flag
|
||||
##
|
||||
if channel.opened:
|
||||
trace "Try to open channel twice"
|
||||
return
|
||||
@@ -381,7 +401,7 @@ proc lenBySrc(m: Yamux, isSrc: bool): int =
|
||||
for v in m.channels.values():
|
||||
if v.isSrc == isSrc: result += 1
|
||||
|
||||
proc cleanupChann(m: Yamux, channel: YamuxChannel) {.async.} =
|
||||
proc cleanupChannel(m: Yamux, channel: YamuxChannel) {.async.} =
|
||||
await channel.join()
|
||||
m.channels.del(channel.id)
|
||||
when defined(libp2p_yamux_metrics):
|
||||
@@ -419,7 +439,7 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool,
|
||||
when defined(libp2p_agents_metrics):
|
||||
result.shortAgent = m.connection.shortAgent
|
||||
m.channels[id] = result
|
||||
asyncSpawn m.cleanupChann(result)
|
||||
asyncSpawn m.cleanupChannel(result)
|
||||
trace "created channel", id, pid=m.connection.peerId
|
||||
when defined(libp2p_yamux_metrics):
|
||||
libp2p_yamux_channels.set(m.lenBySrc(isSrc).int64, [$isSrc, $result.peerId])
|
||||
@@ -440,7 +460,7 @@ method close*(m: Yamux) {.async.} =
|
||||
trace "Closed yamux"
|
||||
|
||||
proc handleStream(m: Yamux, channel: YamuxChannel) {.async.} =
|
||||
## call the muxer stream handler for this channel
|
||||
## Call the muxer stream handler for this channel
|
||||
##
|
||||
try:
|
||||
await m.streamHandler(channel)
|
||||
@@ -474,6 +494,7 @@ method handle*(m: Yamux) {.async.} =
|
||||
else:
|
||||
if header.streamId in m.flushed:
|
||||
m.flushed.del(header.streamId)
|
||||
|
||||
if header.streamId mod 2 == m.currentId mod 2:
|
||||
debug "Peer used our reserved stream id, skipping", id=header.streamId, currentId=m.currentId, peerId=m.connection.peerId
|
||||
raise newException(YamuxError, "Peer used our reserved stream id")
|
||||
|
||||
@@ -157,7 +157,7 @@ method rpcHandler*(f: FloodSub,
|
||||
|
||||
# In theory, if topics are the same in all messages, we could batch - we'd
|
||||
# also have to be careful to only include validated messages
|
||||
f.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
|
||||
f.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
|
||||
trace "Forwared message to peers", peers = toSendPeers.len
|
||||
|
||||
f.updateMetrics(rpcMsg)
|
||||
@@ -219,7 +219,7 @@ method publish*(f: FloodSub,
|
||||
return 0
|
||||
|
||||
# Try to send to all peers that are known to be interested
|
||||
f.broadcast(peers, RPCMsg(messages: @[msg]))
|
||||
f.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_pubsub_messages_published.inc(labelValues = [topic])
|
||||
|
||||
@@ -46,6 +46,9 @@ declareCounter(libp2p_gossipsub_saved_bytes, "bytes saved by gossipsub optimizat
|
||||
declareCounter(libp2p_gossipsub_duplicate, "number of duplicates received")
|
||||
declareCounter(libp2p_gossipsub_received, "number of messages received (deduplicated)")
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id", "topic"])
|
||||
|
||||
proc init*(_: type[GossipSubParams]): GossipSubParams =
|
||||
GossipSubParams(
|
||||
explicit: true,
|
||||
@@ -217,6 +220,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
|
||||
for topic, info in stats[].topicInfos.mpairs:
|
||||
info.firstMessageDeliveries = 0
|
||||
|
||||
pubSubPeer.stopSendNonPriorityTask()
|
||||
|
||||
procCall FloodSub(g).unsubscribePeer(peer)
|
||||
|
||||
proc handleSubscribe*(g: GossipSub,
|
||||
@@ -276,12 +281,28 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||
respControl.prune.add(g.handleGraft(peer, control.graft))
|
||||
let messages = g.handleIWant(peer, control.iwant)
|
||||
|
||||
if
|
||||
respControl.prune.len > 0 or
|
||||
respControl.iwant.len > 0 or
|
||||
messages.len > 0:
|
||||
# iwant and prunes from here, also messages
|
||||
let
|
||||
isPruneNotEmpty = respControl.prune.len > 0
|
||||
isIWantNotEmpty = respControl.iwant.len > 0
|
||||
|
||||
if isPruneNotEmpty or isIWantNotEmpty:
|
||||
|
||||
if isIWantNotEmpty:
|
||||
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
|
||||
|
||||
if isPruneNotEmpty:
|
||||
for prune in respControl.prune:
|
||||
if g.knownTopics.contains(prune.topicId):
|
||||
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId])
|
||||
else:
|
||||
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
|
||||
|
||||
trace "sending control message", msg = shortLog(respControl), peer
|
||||
g.send(
|
||||
peer,
|
||||
RPCMsg(control: some(respControl)), isHighPriority = true)
|
||||
|
||||
if messages.len > 0:
|
||||
for smsg in messages:
|
||||
for topic in smsg.topicIds:
|
||||
if g.knownTopics.contains(topic):
|
||||
@@ -289,18 +310,11 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||
else:
|
||||
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])
|
||||
|
||||
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
|
||||
|
||||
for prune in respControl.prune:
|
||||
if g.knownTopics.contains(prune.topicId):
|
||||
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId])
|
||||
else:
|
||||
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
|
||||
|
||||
trace "sending control message", msg = shortLog(respControl), peer
|
||||
# iwant replies have lower priority
|
||||
trace "sending iwant reply messages", peer
|
||||
g.send(
|
||||
peer,
|
||||
RPCMsg(control: some(respControl), messages: messages))
|
||||
RPCMsg(messages: messages), isHighPriority = false)
|
||||
|
||||
proc validateAndRelay(g: GossipSub,
|
||||
msg: Message,
|
||||
@@ -353,7 +367,7 @@ proc validateAndRelay(g: GossipSub,
|
||||
if msg.data.len > msgId.len * 10:
|
||||
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
|
||||
idontwant: @[ControlIWant(messageIds: @[msgId])]
|
||||
))))
|
||||
))), isHighPriority = true)
|
||||
|
||||
for peer in toSendPeers:
|
||||
for heDontWant in peer.heDontWants:
|
||||
@@ -367,7 +381,7 @@ proc validateAndRelay(g: GossipSub,
|
||||
|
||||
# In theory, if topics are the same in all messages, we could batch - we'd
|
||||
# also have to be careful to only include validated messages
|
||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
|
||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false, validMsgId = msgId)
|
||||
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
|
||||
for topic in msg.topicIds:
|
||||
if topic notin g.topics: continue
|
||||
@@ -426,6 +440,11 @@ method rpcHandler*(g: GossipSub,
|
||||
await rateLimit(g, peer, Opt.none(RPCMsg), msgSize)
|
||||
return
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
for m in rpcMsg.messages:
|
||||
for t in m.topicIds:
|
||||
libp2p_pubsub_received_messages.inc(labelValues = [$peer.peerId, t])
|
||||
|
||||
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
|
||||
await rateLimit(g, peer, Opt.some(rpcMsg), msgSize)
|
||||
|
||||
@@ -433,7 +452,7 @@ method rpcHandler*(g: GossipSub,
|
||||
peer.recvObservers(rpcMsg)
|
||||
|
||||
if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
|
||||
g.send(peer, RPCMsg(pong: rpcMsg.ping))
|
||||
g.send(peer, RPCMsg(pong: rpcMsg.ping), isHighPriority = true)
|
||||
peer.pingBudget.dec
|
||||
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
|
||||
template sub: untyped = rpcMsg.subscriptions[i]
|
||||
@@ -478,6 +497,9 @@ method rpcHandler*(g: GossipSub,
|
||||
|
||||
libp2p_gossipsub_duplicate.inc()
|
||||
|
||||
if msg.data.len > msgId.len * 10: #Dont relay to the peers from which we already received (We just do it for large messages)
|
||||
peer.heDontWants[^1].incl(msgId)
|
||||
|
||||
# onto the next message
|
||||
continue
|
||||
|
||||
@@ -543,7 +565,7 @@ method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
|
||||
topicID: topic,
|
||||
peers: g.peerExchangeList(topic),
|
||||
backoff: g.parameters.unsubscribeBackoff.seconds.uint64)])))
|
||||
g.broadcast(mpeers, msg)
|
||||
g.broadcast(mpeers, msg, isHighPriority = true)
|
||||
|
||||
for peer in mpeers:
|
||||
g.pruned(peer, topic, backoff = some(g.parameters.unsubscribeBackoff))
|
||||
@@ -647,7 +669,7 @@ method publish*(g: GossipSub,
|
||||
|
||||
g.mcache.put(msgId, msg)
|
||||
|
||||
g.broadcast(peers, RPCMsg(messages: @[msg]))
|
||||
g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)
|
||||
|
||||
if g.knownTopics.contains(topic):
|
||||
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])
|
||||
|
||||
@@ -530,14 +530,14 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
|
||||
# Send changes to peers after table updates to avoid stale state
|
||||
if grafts.len > 0:
|
||||
let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))
|
||||
g.broadcast(grafts, graft)
|
||||
g.broadcast(grafts, graft, isHighPriority = true)
|
||||
if prunes.len > 0:
|
||||
let prune = RPCMsg(control: some(ControlMessage(
|
||||
prune: @[ControlPrune(
|
||||
topicID: topic,
|
||||
peers: g.peerExchangeList(topic),
|
||||
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
|
||||
g.broadcast(prunes, prune)
|
||||
g.broadcast(prunes, prune, isHighPriority = true)
|
||||
|
||||
proc dropFanoutPeers*(g: GossipSub) {.raises: [].} =
|
||||
# drop peers that we haven't published to in
|
||||
@@ -669,7 +669,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
|
||||
topicID: t,
|
||||
peers: g.peerExchangeList(t),
|
||||
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
|
||||
g.broadcast(prunes, prune)
|
||||
g.broadcast(prunes, prune, isHighPriority = true)
|
||||
|
||||
# pass by ptr in order to both signal we want to update metrics
|
||||
# and as well update the struct for each topic during this iteration
|
||||
@@ -691,7 +691,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
|
||||
libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicId])
|
||||
else:
|
||||
libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"])
|
||||
g.send(peer, RPCMsg(control: some(control)))
|
||||
g.send(peer, RPCMsg(control: some(control)), isHighPriority = true)
|
||||
|
||||
g.mcache.shift() # shift the cache
|
||||
|
||||
|
||||
@@ -138,18 +138,35 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =
|
||||
|
||||
libp2p_pubsub_peers.set(p.peers.len.int64)
|
||||
|
||||
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [].} =
|
||||
## Attempt to send `msg` to remote peer
|
||||
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool, id: MessageId = @[]) {.raises: [].} =
|
||||
## This procedure attempts to send a `msg` (of type `RPCMsg`) to the specified remote peer in the PubSub network.
|
||||
##
|
||||
## Parameters:
|
||||
## - `p`: The `PubSub` instance.
|
||||
## - `peer`: An instance of `PubSubPeer` representing the peer to whom the message should be sent.
|
||||
## - `msg`: The `RPCMsg` instance that contains the message to be sent.
|
||||
## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority.
|
||||
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
|
||||
## priority messages have been sent.
|
||||
|
||||
trace "sending pubsub message to peer", peer, msg = shortLog(msg)
|
||||
peer.send(msg, p.anonymize)
|
||||
asyncSpawn peer.send(msg, p.anonymize, isHighPriority, id)
|
||||
|
||||
proc broadcast*(
|
||||
p: PubSub,
|
||||
sendPeers: auto, # Iteratble[PubSubPeer]
|
||||
msg: RPCMsg) {.raises: [].} =
|
||||
## Attempt to send `msg` to the given peers
|
||||
msg: RPCMsg,
|
||||
isHighPriority: bool,
|
||||
validMsgId: MessageId = @[]) {.raises: [].} =
|
||||
## This procedure attempts to send a `msg` (of type `RPCMsg`) to a specified group of peers in the PubSub network.
|
||||
##
|
||||
## Parameters:
|
||||
## - `p`: The `PubSub` instance.
|
||||
## - `sendPeers`: An iterable of `PubSubPeer` instances representing the peers to whom the message should be sent.
|
||||
## - `msg`: The `RPCMsg` instance that contains the message to be broadcast.
|
||||
## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority.
|
||||
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
|
||||
## priority messages have been sent.
|
||||
|
||||
let npeers = sendPeers.len.int64
|
||||
for sub in msg.subscriptions:
|
||||
@@ -195,19 +212,19 @@ proc broadcast*(
|
||||
|
||||
if anyIt(sendPeers, it.hasObservers):
|
||||
for peer in sendPeers:
|
||||
p.send(peer, msg)
|
||||
p.send(peer, msg, isHighPriority, validMsgId)
|
||||
else:
|
||||
# Fast path that only encodes message once
|
||||
let encoded = encodeRpcMsg(msg, p.anonymize)
|
||||
for peer in sendPeers:
|
||||
asyncSpawn peer.sendEncoded(encoded)
|
||||
asyncSpawn peer.sendEncoded(encoded, isHighPriority, validMsgId)
|
||||
|
||||
proc sendSubs*(p: PubSub,
|
||||
peer: PubSubPeer,
|
||||
topics: openArray[string],
|
||||
subscribe: bool) =
|
||||
## send subscriptions to remote peer
|
||||
p.send(peer, RPCMsg.withSubs(topics, subscribe))
|
||||
p.send(peer, RPCMsg.withSubs(topics, subscribe), isHighPriority = true)
|
||||
|
||||
for topic in topics:
|
||||
if subscribe:
|
||||
|
||||
@@ -21,6 +21,8 @@ import rpc/[messages, message, protobuf],
|
||||
../../protobuf/minprotobuf,
|
||||
../../utility
|
||||
|
||||
#import gossipsub/libp2p_gossipsub_staggerDontWantSave
|
||||
|
||||
export peerid, connection, deques
|
||||
|
||||
logScope:
|
||||
@@ -28,10 +30,12 @@ logScope:
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
declareCounter(libp2p_pubsub_sent_messages, "number of messages sent", labels = ["id", "topic"])
|
||||
declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id", "topic"])
|
||||
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
|
||||
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
|
||||
|
||||
declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"])
|
||||
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"])
|
||||
|
||||
type
|
||||
PeerRateLimitError* = object of CatchableError
|
||||
|
||||
@@ -50,6 +54,18 @@ type
|
||||
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
|
||||
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}
|
||||
|
||||
MessageWithId = object
|
||||
message: seq[byte]
|
||||
msgId: MessageId
|
||||
|
||||
RpcMessageQueue* = ref object
|
||||
# Tracks async tasks for sending high-priority peer-published messages.
|
||||
sendPriorityQueue: Deque[Future[void]]
|
||||
# Queue for lower-priority messages, like "IWANT" replies and relay messages.
|
||||
nonPriorityQueue: AsyncQueue[MessageWithId]
|
||||
# Task for processing non-priority message queue.
|
||||
sendNonPriorityTask: Future[void]
|
||||
|
||||
PubSubPeer* = ref object of RootObj
|
||||
getConn*: GetConn # callback to establish a new send connection
|
||||
onEvent*: OnEvent # Connectivity updates for peer
|
||||
@@ -71,6 +87,8 @@ type
|
||||
behaviourPenalty*: float64 # the eventual penalty score
|
||||
overheadRateLimitOpt*: Opt[TokenBucket]
|
||||
|
||||
rpcmessagequeue: RpcMessageQueue
|
||||
|
||||
RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
|
||||
{.gcsafe, raises: [].}
|
||||
|
||||
@@ -83,6 +101,20 @@ when defined(libp2p_agents_metrics):
|
||||
#so we have to read the parents short agent..
|
||||
p.sendConn.getWrapped().shortAgent
|
||||
|
||||
proc newMessageWithId(msg: seq[byte], id: MessageId): MessageWithId =
|
||||
result.message = msg
|
||||
result.msgId = id
|
||||
|
||||
proc getAgent*(peer: PubSubPeer): string =
|
||||
return
|
||||
when defined(libp2p_agents_metrics):
|
||||
if peer.shortAgent.len > 0:
|
||||
peer.shortAgent
|
||||
else:
|
||||
"unknown"
|
||||
else:
|
||||
"unknown"
|
||||
|
||||
func hash*(p: PubSubPeer): Hash =
|
||||
p.peerId.hash
|
||||
|
||||
@@ -138,12 +170,6 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||
conn, peer = p, closed = conn.closed,
|
||||
data = data.shortLog
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
for m in rmsg.messages:
|
||||
for t in m.topicIDs:
|
||||
# metrics
|
||||
libp2p_pubsub_received_messages.inc(labelValues = [$p.peerId, t])
|
||||
|
||||
await p.handler(p, data)
|
||||
data = newSeq[byte]() # Release memory
|
||||
except PeerRateLimitError as exc:
|
||||
@@ -230,21 +256,17 @@ proc hasSendConn*(p: PubSubPeer): bool =
|
||||
template sendMetrics(msg: RPCMsg): untyped =
|
||||
when defined(libp2p_expensive_metrics):
|
||||
for x in msg.messages:
|
||||
for t in x.topicIDs:
|
||||
for t in x.topicIds:
|
||||
# metrics
|
||||
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
|
||||
|
||||
proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
||||
doAssert(not isNil(p), "pubsubpeer nil!")
|
||||
|
||||
if msg.len <= 0:
|
||||
debug "empty message, skipping", p, msg = shortLog(msg)
|
||||
return
|
||||
|
||||
if msg.len > p.maxMessageSize:
|
||||
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
|
||||
return
|
||||
proc clearSendPriorityQueue(p: PubSubPeer) =
|
||||
while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished:
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
|
||||
discard p.rpcmessagequeue.sendPriorityQueue.popFirst()
|
||||
|
||||
proc sendMsg(p: PubSubPeer, msg: seq[byte], msgId: MessageId) {.async.} =
|
||||
if p.sendConn == nil:
|
||||
# Wait for a send conn to be setup. `connectOnce` will
|
||||
# complete this even if the sendConn setup failed
|
||||
@@ -257,6 +279,12 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
||||
|
||||
trace "sending encoded msgs to peer", conn, encoded = shortLog(msg)
|
||||
|
||||
if msgId.len > 0:
|
||||
for dontWants in p.heDontWants:
|
||||
if msgId in dontWants:
|
||||
#libp2p_gossipsub_staggerDontWantSave.inc()
|
||||
trace "Skipped sending msg/dontwant received from peer", conn, encoded = shortLog(msg)
|
||||
return
|
||||
try:
|
||||
await conn.writeLp(msg)
|
||||
trace "sent pubsub message to remote", conn
|
||||
@@ -269,6 +297,38 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
||||
|
||||
await conn.close() # This will clean up the send connection
|
||||
|
||||
proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool, validMsgId: MessageId = @[]) {.async.} =
|
||||
## Asynchronously sends an encoded message to a specified `PubSubPeer`.
|
||||
##
|
||||
## Parameters:
|
||||
## - `p`: The `PubSubPeer` instance to which the message is to be sent.
|
||||
## - `msg`: The message to be sent, encoded as a sequence of bytes (`seq[byte]`).
|
||||
## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority.
|
||||
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
|
||||
## priority messages have been sent.
|
||||
doAssert(not isNil(p), "pubsubpeer nil!")
|
||||
|
||||
if msg.len <= 0:
|
||||
debug "empty message, skipping", p, msg = shortLog(msg)
|
||||
return
|
||||
|
||||
if msg.len > p.maxMessageSize:
|
||||
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
|
||||
return
|
||||
|
||||
if isHighPriority:
|
||||
p.clearSendPriorityQueue()
|
||||
let f = p.sendMsg(msg, validMsgId)
|
||||
if not f.finished:
|
||||
p.rpcmessagequeue.sendPriorityQueue.addLast(f)
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
|
||||
else:
|
||||
await p.rpcmessagequeue.nonPriorityQueue.addLast(newMessageWithId(msg, validMsgId))
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
|
||||
trace "message queued", p, msg = shortLog(msg)
|
||||
|
||||
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
|
||||
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
|
||||
## Each new `RPCMsg` accumulates Messages until reaching the specified `maxSize`. If a single Message
|
||||
@@ -304,7 +364,16 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
|
||||
else:
|
||||
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||
|
||||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
|
||||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool, validMsgId: MessageId = @[]) {.async.} =
|
||||
## Asynchronously sends an `RPCMsg` to a specified `PubSubPeer` with an option for anonymization.
|
||||
##
|
||||
## Parameters:
|
||||
## - `p`: The `PubSubPeer` instance to which the message is to be sent.
|
||||
## - `msg`: The `RPCMsg` instance representing the message to be sent.
|
||||
## - `anonymize`: A boolean flag indicating whether the message should be sent with anonymization.
|
||||
## - `isHighPriority`: A boolean flag indicating whether the message should be treated as high priority.
|
||||
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
|
||||
## priority messages have been sent.
|
||||
# When sending messages, we take care to re-encode them with the right
|
||||
# anonymization flag to ensure that we're not penalized for sending invalid
|
||||
# or malicious data on the wire - in particular, re-encoding protects against
|
||||
@@ -324,11 +393,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
|
||||
|
||||
if encoded.len > p.maxMessageSize and msg.messages.len > 1:
|
||||
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
|
||||
asyncSpawn p.sendEncoded(encodedSplitMsg)
|
||||
await p.sendEncoded(encodedSplitMsg, isHighPriority, validMsgId)
|
||||
else:
|
||||
# If the message size is within limits, send it as is
|
||||
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
|
||||
asyncSpawn p.sendEncoded(encoded)
|
||||
await p.sendEncoded(encoded, isHighPriority, validMsgId)
|
||||
|
||||
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
|
||||
for sentIHave in p.sentIHaves.mitems():
|
||||
@@ -337,6 +406,43 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
|
||||
return true
|
||||
return false
|
||||
|
||||
proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
|
||||
while true:
|
||||
# we send non-priority messages only if there are no pending priority messages
|
||||
let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
|
||||
while p.rpcmessagequeue.sendPriorityQueue.len > 0:
|
||||
p.clearSendPriorityQueue()
|
||||
# this minimizes the number of times we have to wait for something (each wait = performance cost)
|
||||
# we will never wait for a finished future and by waiting for the last one, all that come before it are guaranteed
|
||||
# to be finished already (since sends are processed in order).
|
||||
if p.rpcmessagequeue.sendPriorityQueue.len > 0:
|
||||
await p.rpcmessagequeue.sendPriorityQueue[^1]
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
|
||||
await p.sendMsg(msg.message, msg.msgId)
|
||||
|
||||
proc startSendNonPriorityTask(p: PubSubPeer) =
|
||||
debug "starting sendNonPriorityTask", p
|
||||
if p.rpcmessagequeue.sendNonPriorityTask.isNil:
|
||||
p.rpcmessagequeue.sendNonPriorityTask = p.sendNonPriorityTask()
|
||||
|
||||
proc stopSendNonPriorityTask*(p: PubSubPeer) =
|
||||
if not p.rpcmessagequeue.sendNonPriorityTask.isNil:
|
||||
debug "stopping sendNonPriorityTask", p
|
||||
p.rpcmessagequeue.sendNonPriorityTask.cancel()
|
||||
p.rpcmessagequeue.sendNonPriorityTask = nil
|
||||
p.rpcmessagequeue.sendPriorityQueue.clear()
|
||||
p.rpcmessagequeue.nonPriorityQueue.clear()
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
|
||||
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
|
||||
|
||||
proc new(T: typedesc[RpcMessageQueue]): T =
|
||||
return T(
|
||||
sendPriorityQueue: initDeque[Future[void]](),
|
||||
nonPriorityQueue: newAsyncQueue[MessageWithId](),
|
||||
)
|
||||
|
||||
proc new*(
|
||||
T: typedesc[PubSubPeer],
|
||||
peerId: PeerId,
|
||||
@@ -353,17 +459,9 @@ proc new*(
|
||||
peerId: peerId,
|
||||
connectedFut: newFuture[void](),
|
||||
maxMessageSize: maxMessageSize,
|
||||
overheadRateLimitOpt: overheadRateLimitOpt
|
||||
overheadRateLimitOpt: overheadRateLimitOpt,
|
||||
rpcmessagequeue: RpcMessageQueue.new(),
|
||||
)
|
||||
result.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||
result.heDontWants.addFirst(default(HashSet[MessageId]))
|
||||
|
||||
proc getAgent*(peer: PubSubPeer): string =
|
||||
return
|
||||
when defined(libp2p_agents_metrics):
|
||||
if peer.shortAgent.len > 0:
|
||||
peer.shortAgent
|
||||
else:
|
||||
"unknown"
|
||||
else:
|
||||
"unknown"
|
||||
result.startSendNonPriorityTask()
|
||||
|
||||
@@ -89,6 +89,7 @@ build_target() {
|
||||
mkdir "$CACHE_DIR"
|
||||
cp -a "$TARGET_DIR"/* "$CACHE_DIR"/
|
||||
fi
|
||||
echo "Binary built successfully."
|
||||
}
|
||||
|
||||
if target_needs_rebuilding; then
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import chronos
|
||||
import macros
|
||||
import algorithm
|
||||
|
||||
import ../libp2p/transports/tcptransport
|
||||
@@ -110,19 +111,83 @@ proc bridgedConnections*: (Connection, Connection) =
|
||||
await connA.pushData(data)
|
||||
return (connA, connB)
|
||||
|
||||
|
||||
proc checkExpiringInternal(cond: proc(): bool {.raises: [], gcsafe.} ): Future[bool] {.async.} =
|
||||
let start = Moment.now()
|
||||
while true:
|
||||
if Moment.now() > (start + chronos.seconds(5)):
|
||||
return false
|
||||
elif cond():
|
||||
return true
|
||||
macro checkUntilCustomTimeout*(timeout: Duration, code: untyped): untyped =
|
||||
## Periodically checks a given condition until it is true or a timeout occurs.
|
||||
##
|
||||
## `code`: untyped - A condition expression that should eventually evaluate to true.
|
||||
## `timeout`: Duration - The maximum duration to wait for the condition to be true.
|
||||
##
|
||||
## Examples:
|
||||
## ```nim
|
||||
## # Example 1:
|
||||
## asyncTest "checkUntilCustomTimeout should pass if the condition is true":
|
||||
## let a = 2
|
||||
## let b = 2
|
||||
## checkUntilCustomTimeout(2.seconds):
|
||||
## a == b
|
||||
##
|
||||
## # Example 2: Multiple conditions
|
||||
## asyncTest "checkUntilCustomTimeout should pass if the conditions are true":
|
||||
## let a = 2
|
||||
## let b = 2
|
||||
## checkUntilCustomTimeout(5.seconds)::
|
||||
## a == b
|
||||
## a == 2
|
||||
## b == 1
|
||||
## ```
|
||||
# Helper proc to recursively build a combined boolean expression
|
||||
proc buildAndExpr(n: NimNode): NimNode =
|
||||
if n.kind == nnkStmtList and n.len > 0:
|
||||
var combinedExpr = n[0] # Start with the first expression
|
||||
for i in 1..<n.len:
|
||||
# Combine the current expression with the next using 'and'
|
||||
combinedExpr = newCall("and", combinedExpr, n[i])
|
||||
return combinedExpr
|
||||
else:
|
||||
await sleepAsync(1.millis)
|
||||
return n
|
||||
|
||||
template checkExpiring*(code: untyped): untyped =
|
||||
check await checkExpiringInternal(proc(): bool = code)
|
||||
# Build the combined expression
|
||||
let combinedBoolExpr = buildAndExpr(code)
|
||||
|
||||
result = quote do:
|
||||
proc checkExpiringInternal(): Future[void] {.gensym, async.} =
|
||||
let start = Moment.now()
|
||||
while true:
|
||||
if Moment.now() > (start + `timeout`):
|
||||
checkpoint("[TIMEOUT] Timeout was reached and the conditions were not true. Check if the code is working as " &
|
||||
"expected or consider increasing the timeout param.")
|
||||
check `code`
|
||||
return
|
||||
else:
|
||||
if `combinedBoolExpr`:
|
||||
return
|
||||
else:
|
||||
await sleepAsync(1.millis)
|
||||
await checkExpiringInternal()
|
||||
|
||||
macro checkUntilTimeout*(code: untyped): untyped =
|
||||
## Same as `checkUntilCustomTimeout` but with a default timeout of 10 seconds.
|
||||
##
|
||||
## Examples:
|
||||
## ```nim
|
||||
## # Example 1:
|
||||
## asyncTest "checkUntilTimeout should pass if the condition is true":
|
||||
## let a = 2
|
||||
## let b = 2
|
||||
## checkUntilTimeout:
|
||||
## a == b
|
||||
##
|
||||
## # Example 2: Multiple conditions
|
||||
## asyncTest "checkUntilTimeout should pass if the conditions are true":
|
||||
## let a = 2
|
||||
## let b = 2
|
||||
## checkUntilTimeout:
|
||||
## a == b
|
||||
## a == 2
|
||||
## b == 1
|
||||
## ```
|
||||
result = quote do:
|
||||
checkUntilCustomTimeout(10.seconds, `code`)
|
||||
|
||||
proc unorderedCompare*[T](a, b: seq[T]): bool =
|
||||
if a == b:
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# syntax=docker/dockerfile:1.5-labs
|
||||
FROM nimlang/nim:1.6.14 as builder
|
||||
FROM nimlang/nim:1.6.16 as builder
|
||||
|
||||
WORKDIR /workspace
|
||||
|
||||
|
||||
@@ -361,7 +361,7 @@ suite "FloodSub":
|
||||
check (await smallNode[0].publish("foo", smallMessage1)) > 0
|
||||
check (await bigNode[0].publish("foo", smallMessage2)) > 0
|
||||
|
||||
checkExpiring: messageReceived == 2
|
||||
checkUntilTimeout: messageReceived == 2
|
||||
|
||||
check (await smallNode[0].publish("foo", bigMessage)) > 0
|
||||
check (await bigNode[0].publish("foo", bigMessage)) > 0
|
||||
@@ -396,7 +396,7 @@ suite "FloodSub":
|
||||
|
||||
check (await bigNode1[0].publish("foo", bigMessage)) > 0
|
||||
|
||||
checkExpiring: messageReceived == 1
|
||||
checkUntilTimeout: messageReceived == 1
|
||||
|
||||
await allFuturesThrowing(
|
||||
bigNode1[0].switch.stop(),
|
||||
|
||||
@@ -779,9 +779,9 @@ suite "GossipSub internal":
|
||||
|
||||
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
||||
ihave: @[ControlIHave(topicId: "foobar", messageIds: iwantMessageIds)]
|
||||
))))
|
||||
))), isHighPriority = false)
|
||||
|
||||
checkExpiring: receivedMessages[] == sentMessages
|
||||
checkUntilTimeout: receivedMessages[] == sentMessages
|
||||
check receivedMessages[].len == 2
|
||||
|
||||
await teardownTest(gossip0, gossip1)
|
||||
@@ -796,10 +796,10 @@ suite "GossipSub internal":
|
||||
|
||||
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
||||
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
|
||||
))))
|
||||
))), isHighPriority = false)
|
||||
|
||||
await sleepAsync(300.milliseconds)
|
||||
checkExpiring: receivedMessages[].len == 0
|
||||
checkUntilTimeout: receivedMessages[].len == 0
|
||||
|
||||
await teardownTest(gossip0, gossip1)
|
||||
|
||||
@@ -813,9 +813,9 @@ suite "GossipSub internal":
|
||||
|
||||
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
||||
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
|
||||
))))
|
||||
))), isHighPriority = false)
|
||||
|
||||
checkExpiring: receivedMessages[] == sentMessages
|
||||
checkUntilTimeout: receivedMessages[] == sentMessages
|
||||
check receivedMessages[].len == 2
|
||||
|
||||
await teardownTest(gossip0, gossip1)
|
||||
@@ -831,7 +831,7 @@ suite "GossipSub internal":
|
||||
|
||||
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
||||
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
|
||||
))))
|
||||
))), isHighPriority = false)
|
||||
|
||||
var smallestSet: HashSet[seq[byte]]
|
||||
let seqs = toSeq(sentMessages)
|
||||
@@ -840,7 +840,7 @@ suite "GossipSub internal":
|
||||
else:
|
||||
smallestSet.incl(seqs[1])
|
||||
|
||||
checkExpiring: receivedMessages[] == smallestSet
|
||||
checkUntilTimeout: receivedMessages[] == smallestSet
|
||||
check receivedMessages[].len == 1
|
||||
|
||||
await teardownTest(gossip0, gossip1)
|
||||
|
||||
@@ -310,9 +310,9 @@ suite "GossipSub":
|
||||
let gossip1 = GossipSub(nodes[0])
|
||||
let gossip2 = GossipSub(nodes[1])
|
||||
|
||||
checkExpiring:
|
||||
"foobar" in gossip2.topics and
|
||||
"foobar" in gossip1.gossipsub and
|
||||
checkUntilTimeout:
|
||||
"foobar" in gossip2.topics
|
||||
"foobar" in gossip1.gossipsub
|
||||
gossip1.gossipsub.hasPeerId("foobar", gossip2.peerInfo.peerId)
|
||||
|
||||
await allFuturesThrowing(
|
||||
@@ -454,9 +454,9 @@ suite "GossipSub":
|
||||
nodes[1].subscribe("foobar", handler)
|
||||
|
||||
let gsNode = GossipSub(nodes[1])
|
||||
checkExpiring:
|
||||
gsNode.mesh.getOrDefault("foobar").len == 0 and
|
||||
GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0 and
|
||||
checkUntilTimeout:
|
||||
gsNode.mesh.getOrDefault("foobar").len == 0
|
||||
GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0
|
||||
(
|
||||
GossipSub(nodes[0]).gossipsub.getOrDefault("foobar").len == 1 or
|
||||
GossipSub(nodes[0]).fanout.getOrDefault("foobar").len == 1
|
||||
@@ -572,16 +572,16 @@ suite "GossipSub":
|
||||
gossip1.seen = TimedCache[MessageId].init()
|
||||
gossip3.seen = TimedCache[MessageId].init()
|
||||
let msgId = toSeq(gossip2.validationSeen.keys)[0]
|
||||
checkExpiring(try: gossip2.validationSeen[msgId].len > 0 except: false)
|
||||
checkUntilTimeout(try: gossip2.validationSeen[msgId].len > 0 except: false)
|
||||
result = ValidationResult.Accept
|
||||
bFinished.complete()
|
||||
|
||||
nodes[1].addValidator("foobar", slowValidator)
|
||||
|
||||
checkExpiring(
|
||||
gossip1.mesh.getOrDefault("foobar").len == 2 and
|
||||
gossip2.mesh.getOrDefault("foobar").len == 2 and
|
||||
gossip3.mesh.getOrDefault("foobar").len == 2)
|
||||
checkUntilTimeout:
|
||||
gossip1.mesh.getOrDefault("foobar").len == 2
|
||||
gossip2.mesh.getOrDefault("foobar").len == 2
|
||||
gossip3.mesh.getOrDefault("foobar").len == 2
|
||||
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 2
|
||||
|
||||
await bFinished
|
||||
@@ -676,7 +676,7 @@ suite "GossipSub":
|
||||
|
||||
# Now try with a mesh
|
||||
gossip1.subscribe("foobar", handler)
|
||||
checkExpiring: gossip1.mesh.peers("foobar") > 5
|
||||
checkUntilTimeout: gossip1.mesh.peers("foobar") > 5
|
||||
|
||||
# use a different length so that the message is not equal to the last
|
||||
check (await nodes[0].publish("foobar", newSeq[byte](500_000))) == numPeersSecondMsg
|
||||
@@ -912,14 +912,14 @@ suite "GossipSub":
|
||||
|
||||
gossip3.broadcast(gossip3.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
||||
idontwant: @[ControlIWant(messageIds: @[newSeq[byte](10)])]
|
||||
))))
|
||||
checkExpiring: gossip2.mesh.getOrDefault("foobar").anyIt(it.heDontWants[^1].len == 1)
|
||||
))), isHighPriority = true)
|
||||
checkUntilTimeout: gossip2.mesh.getOrDefault("foobar").anyIt(it.heDontWants[^1].len == 1)
|
||||
|
||||
tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1
|
||||
|
||||
await bFinished
|
||||
|
||||
checkExpiring: toSeq(gossip3.mesh.getOrDefault("foobar")).anyIt(it.heDontWants[^1].len == 1)
|
||||
checkUntilTimeout: toSeq(gossip3.mesh.getOrDefault("foobar")).anyIt(it.heDontWants[^1].len == 1)
|
||||
check: toSeq(gossip1.mesh.getOrDefault("foobar")).anyIt(it.heDontWants[^1].len == 0)
|
||||
|
||||
await allFuturesThrowing(
|
||||
@@ -968,7 +968,10 @@ suite "GossipSub":
|
||||
let rateLimitHits = currentRateLimitHits()
|
||||
let (nodes, gossip0, gossip1) = await initializeGossipTest()
|
||||
|
||||
gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))]))
|
||||
gossip0.broadcast(
|
||||
gossip0.mesh["foobar"],
|
||||
RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))]),
|
||||
isHighPriority = true)
|
||||
await sleepAsync(300.millis)
|
||||
|
||||
check currentRateLimitHits() == rateLimitHits
|
||||
@@ -976,7 +979,10 @@ suite "GossipSub":
|
||||
|
||||
# Disconnect peer when rate limiting is enabled
|
||||
gossip1.parameters.disconnectPeerAboveRateLimit = true
|
||||
gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))]))
|
||||
gossip0.broadcast(
|
||||
gossip0.mesh["foobar"],
|
||||
RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))]),
|
||||
isHighPriority = true)
|
||||
await sleepAsync(300.millis)
|
||||
|
||||
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
|
||||
@@ -990,7 +996,7 @@ suite "GossipSub":
|
||||
let (nodes, gossip0, gossip1) = await initializeGossipTest()
|
||||
|
||||
# Simulate sending an undecodable message
|
||||
await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](33, 1.byte))
|
||||
await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](33, 1.byte), isHighPriority = true)
|
||||
await sleepAsync(300.millis)
|
||||
|
||||
check currentRateLimitHits() == rateLimitHits + 1
|
||||
@@ -998,9 +1004,9 @@ suite "GossipSub":
|
||||
|
||||
# Disconnect peer when rate limiting is enabled
|
||||
gossip1.parameters.disconnectPeerAboveRateLimit = true
|
||||
await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](35, 1.byte))
|
||||
await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](35, 1.byte), isHighPriority = true)
|
||||
|
||||
checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
|
||||
checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
|
||||
check currentRateLimitHits() == rateLimitHits + 2
|
||||
|
||||
await stopNodes(nodes)
|
||||
@@ -1014,7 +1020,7 @@ suite "GossipSub":
|
||||
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](33)))
|
||||
], backoff: 123'u64)
|
||||
])))
|
||||
gossip0.broadcast(gossip0.mesh["foobar"], msg)
|
||||
gossip0.broadcast(gossip0.mesh["foobar"], msg, isHighPriority = true)
|
||||
await sleepAsync(300.millis)
|
||||
|
||||
check currentRateLimitHits() == rateLimitHits + 1
|
||||
@@ -1027,9 +1033,9 @@ suite "GossipSub":
|
||||
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](35)))
|
||||
], backoff: 123'u64)
|
||||
])))
|
||||
gossip0.broadcast(gossip0.mesh["foobar"], msg2)
|
||||
gossip0.broadcast(gossip0.mesh["foobar"], msg2, isHighPriority = true)
|
||||
|
||||
checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
|
||||
checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
|
||||
check currentRateLimitHits() == rateLimitHits + 2
|
||||
|
||||
await stopNodes(nodes)
|
||||
@@ -1049,7 +1055,7 @@ suite "GossipSub":
|
||||
|
||||
let msg = RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](40))])
|
||||
|
||||
gossip0.broadcast(gossip0.mesh[topic], msg)
|
||||
gossip0.broadcast(gossip0.mesh[topic], msg, isHighPriority = true)
|
||||
await sleepAsync(300.millis)
|
||||
|
||||
check currentRateLimitHits() == rateLimitHits + 1
|
||||
@@ -1057,9 +1063,12 @@ suite "GossipSub":
|
||||
|
||||
# Disconnect peer when rate limiting is enabled
|
||||
gossip1.parameters.disconnectPeerAboveRateLimit = true
|
||||
gossip0.broadcast(gossip0.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]))
|
||||
gossip0.broadcast(
|
||||
gossip0.mesh[topic],
|
||||
RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]),
|
||||
isHighPriority = true)
|
||||
|
||||
checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
|
||||
checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
|
||||
check currentRateLimitHits() == rateLimitHits + 2
|
||||
|
||||
await stopNodes(nodes)
|
||||
|
||||
@@ -215,7 +215,7 @@ suite "Connection Manager":
|
||||
|
||||
await connMngr.close()
|
||||
|
||||
checkExpiring: waitedConn3.cancelled()
|
||||
checkUntilTimeout: waitedConn3.cancelled()
|
||||
|
||||
await allFuturesThrowing(
|
||||
allFutures(muxs.mapIt( it.close() )))
|
||||
@@ -231,7 +231,7 @@ suite "Connection Manager":
|
||||
|
||||
await muxer.close()
|
||||
|
||||
checkExpiring: muxer notin connMngr
|
||||
checkUntilTimeout: muxer notin connMngr
|
||||
|
||||
await connMngr.close()
|
||||
|
||||
@@ -254,7 +254,7 @@ suite "Connection Manager":
|
||||
check peerId in connMngr
|
||||
await connMngr.dropPeer(peerId)
|
||||
|
||||
checkExpiring: peerId notin connMngr
|
||||
checkUntilTimeout: peerId notin connMngr
|
||||
check isNil(connMngr.selectMuxer(peerId, Direction.In))
|
||||
check isNil(connMngr.selectMuxer(peerId, Direction.Out))
|
||||
|
||||
|
||||
@@ -64,7 +64,7 @@ suite "Dcutr":
|
||||
await DcutrClient.new().startSync(behindNATSwitch, publicSwitch.peerInfo.peerId, behindNATSwitch.peerInfo.addrs)
|
||||
.wait(300.millis)
|
||||
|
||||
checkExpiring:
|
||||
checkUntilTimeout:
|
||||
# we still expect a new connection to be open by the receiver peer acting as the dcutr server
|
||||
behindNATSwitch.connManager.connCount(publicSwitch.peerInfo.peerId) == 2
|
||||
|
||||
@@ -83,7 +83,7 @@ suite "Dcutr":
|
||||
|
||||
body
|
||||
|
||||
checkExpiring:
|
||||
checkUntilTimeout:
|
||||
# we still expect a new connection to be open by the receiver peer acting as the dcutr server
|
||||
behindNATSwitch.connManager.connCount(publicSwitch.peerInfo.peerId) == 2
|
||||
|
||||
@@ -150,7 +150,7 @@ suite "Dcutr":
|
||||
await DcutrClient.new().startSync(behindNATSwitch, publicSwitch.peerInfo.peerId, behindNATSwitch.peerInfo.addrs)
|
||||
.wait(300.millis)
|
||||
|
||||
checkExpiring:
|
||||
checkUntilTimeout:
|
||||
# we still expect a new connection to be open by the receiver peer acting as the dcutr server
|
||||
behindNATSwitch.connManager.connCount(publicSwitch.peerInfo.peerId) == 1
|
||||
|
||||
|
||||
42
tests/testhelpers.nim
Normal file
42
tests/testhelpers.nim
Normal file
@@ -0,0 +1,42 @@
|
||||
{.used.}
|
||||
|
||||
# Nim-Libp2p
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
import ./helpers
|
||||
|
||||
suite "Helpers":
|
||||
|
||||
asyncTest "checkUntilTimeout should pass if the condition is true":
|
||||
let a = 2
|
||||
let b = 2
|
||||
checkUntilTimeout:
|
||||
a == b
|
||||
|
||||
asyncTest "checkUntilTimeout should pass if the conditions are true":
|
||||
let a = 2
|
||||
let b = 2
|
||||
checkUntilTimeout:
|
||||
a == b
|
||||
a == 2
|
||||
b == 2
|
||||
|
||||
asyncTest "checkUntilCustomTimeout should pass when the condition is true":
|
||||
let a = 2
|
||||
let b = 2
|
||||
checkUntilCustomTimeout(2.seconds):
|
||||
a == b
|
||||
|
||||
asyncTest "checkUntilCustomTimeout should pass when the conditions are true":
|
||||
let a = 2
|
||||
let b = 2
|
||||
checkUntilCustomTimeout(5.seconds):
|
||||
a == b
|
||||
a == 2
|
||||
b == 2
|
||||
@@ -89,8 +89,8 @@ suite "Hole Punching":
|
||||
|
||||
await publicPeerSwitch.connect(privatePeerSwitch.peerInfo.peerId, (await privatePeerRelayAddr))
|
||||
|
||||
checkExpiring:
|
||||
privatePeerSwitch.connManager.connCount(publicPeerSwitch.peerInfo.peerId) == 1 and
|
||||
checkUntilTimeout:
|
||||
privatePeerSwitch.connManager.connCount(publicPeerSwitch.peerInfo.peerId) == 1
|
||||
not isRelayed(privatePeerSwitch.connManager.selectMuxer(publicPeerSwitch.peerInfo.peerId).connection)
|
||||
|
||||
await allFuturesThrowing(
|
||||
@@ -127,8 +127,8 @@ suite "Hole Punching":
|
||||
|
||||
await publicPeerSwitch.connect(privatePeerSwitch.peerInfo.peerId, (await privatePeerRelayAddr))
|
||||
|
||||
checkExpiring:
|
||||
privatePeerSwitch.connManager.connCount(publicPeerSwitch.peerInfo.peerId) == 1 and
|
||||
checkUntilTimeout:
|
||||
privatePeerSwitch.connManager.connCount(publicPeerSwitch.peerInfo.peerId) == 1
|
||||
not isRelayed(privatePeerSwitch.connManager.selectMuxer(publicPeerSwitch.peerInfo.peerId).connection)
|
||||
|
||||
await allFuturesThrowing(
|
||||
|
||||
@@ -219,8 +219,8 @@ suite "Identify":
|
||||
|
||||
await identifyPush2.push(switch2.peerInfo, conn)
|
||||
|
||||
checkExpiring: switch1.peerStore[ProtoBook][switch2.peerInfo.peerId] == switch2.peerInfo.protocols
|
||||
checkExpiring: switch1.peerStore[AddressBook][switch2.peerInfo.peerId] == switch2.peerInfo.addrs
|
||||
checkUntilTimeout: switch1.peerStore[ProtoBook][switch2.peerInfo.peerId] == switch2.peerInfo.protocols
|
||||
checkUntilTimeout: switch1.peerStore[AddressBook][switch2.peerInfo.peerId] == switch2.peerInfo.addrs
|
||||
|
||||
await closeAll()
|
||||
|
||||
|
||||
@@ -829,7 +829,7 @@ suite "Mplex":
|
||||
check:
|
||||
unorderedCompare(dialStreams, mplexDial.getStreams())
|
||||
|
||||
checkExpiring: listenStreams.len == 10 and dialStreams.len == 10
|
||||
checkUntilTimeout: listenStreams.len == 10 and dialStreams.len == 10
|
||||
|
||||
await mplexListen.close()
|
||||
await allFuturesThrowing(
|
||||
@@ -876,7 +876,7 @@ suite "Mplex":
|
||||
check:
|
||||
unorderedCompare(dialStreams, mplexDial.getStreams())
|
||||
|
||||
checkExpiring: listenStreams.len == 10 and dialStreams.len == 10
|
||||
checkUntilTimeout: listenStreams.len == 10 and dialStreams.len == 10
|
||||
|
||||
mplexHandle.cancel()
|
||||
await allFuturesThrowing(
|
||||
@@ -920,7 +920,7 @@ suite "Mplex":
|
||||
check:
|
||||
unorderedCompare(dialStreams, mplexDial.getStreams())
|
||||
|
||||
checkExpiring: listenStreams.len == 10 and dialStreams.len == 10
|
||||
checkUntilTimeout: listenStreams.len == 10 and dialStreams.len == 10
|
||||
|
||||
await conn.close()
|
||||
await allFuturesThrowing(
|
||||
@@ -967,7 +967,7 @@ suite "Mplex":
|
||||
check:
|
||||
unorderedCompare(dialStreams, mplexDial.getStreams())
|
||||
|
||||
checkExpiring: listenStreams.len == 10 and dialStreams.len == 10
|
||||
checkUntilTimeout: listenStreams.len == 10 and dialStreams.len == 10
|
||||
|
||||
await listenConn.closeWithEOF()
|
||||
await allFuturesThrowing(
|
||||
|
||||
@@ -57,4 +57,5 @@ import testtcptransport,
|
||||
testautorelay,
|
||||
testdcutr,
|
||||
testhpservice,
|
||||
testutility
|
||||
testutility,
|
||||
testhelpers
|
||||
|
||||
@@ -62,8 +62,8 @@ suite "RendezVous Interface":
|
||||
dm.advertise(RdvNamespace("ns1"))
|
||||
dm.advertise(RdvNamespace("ns2"))
|
||||
|
||||
checkExpiring: rdv.numAdvertiseNs1 >= 5
|
||||
checkExpiring: rdv.numAdvertiseNs2 >= 5
|
||||
checkUntilTimeout: rdv.numAdvertiseNs1 >= 5
|
||||
checkUntilTimeout: rdv.numAdvertiseNs2 >= 5
|
||||
await client.stop()
|
||||
|
||||
asyncTest "Check timeToAdvertise interval":
|
||||
|
||||
@@ -283,12 +283,12 @@ suite "Switch":
|
||||
await switch2.disconnect(switch1.peerInfo.peerId)
|
||||
|
||||
check not switch2.isConnected(switch1.peerInfo.peerId)
|
||||
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
checkUntilTimeout: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
|
||||
checkTracker(LPChannelTrackerName)
|
||||
checkTracker(SecureConnTrackerName)
|
||||
|
||||
checkExpiring:
|
||||
checkUntilTimeout:
|
||||
startCounts ==
|
||||
@[
|
||||
switch1.connManager.inSema.count, switch1.connManager.outSema.count,
|
||||
@@ -336,7 +336,7 @@ suite "Switch":
|
||||
await switch2.disconnect(switch1.peerInfo.peerId)
|
||||
|
||||
check not switch2.isConnected(switch1.peerInfo.peerId)
|
||||
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
checkUntilTimeout: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
|
||||
checkTracker(LPChannelTrackerName)
|
||||
checkTracker(SecureConnTrackerName)
|
||||
@@ -388,7 +388,7 @@ suite "Switch":
|
||||
await switch2.disconnect(switch1.peerInfo.peerId)
|
||||
|
||||
check not switch2.isConnected(switch1.peerInfo.peerId)
|
||||
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
checkUntilTimeout: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
|
||||
checkTracker(LPChannelTrackerName)
|
||||
checkTracker(SecureConnTrackerName)
|
||||
@@ -439,7 +439,7 @@ suite "Switch":
|
||||
await switch2.disconnect(switch1.peerInfo.peerId)
|
||||
|
||||
check not switch2.isConnected(switch1.peerInfo.peerId)
|
||||
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
checkUntilTimeout: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
|
||||
checkTracker(LPChannelTrackerName)
|
||||
checkTracker(SecureConnTrackerName)
|
||||
@@ -490,7 +490,7 @@ suite "Switch":
|
||||
await switch2.disconnect(switch1.peerInfo.peerId)
|
||||
|
||||
check not switch2.isConnected(switch1.peerInfo.peerId)
|
||||
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
checkUntilTimeout: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
|
||||
checkTracker(LPChannelTrackerName)
|
||||
checkTracker(SecureConnTrackerName)
|
||||
@@ -554,8 +554,8 @@ suite "Switch":
|
||||
|
||||
check not switch2.isConnected(switch1.peerInfo.peerId)
|
||||
check not switch3.isConnected(switch1.peerInfo.peerId)
|
||||
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
checkExpiring: not switch1.isConnected(switch3.peerInfo.peerId)
|
||||
checkUntilTimeout: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
checkUntilTimeout: not switch1.isConnected(switch3.peerInfo.peerId)
|
||||
|
||||
checkTracker(LPChannelTrackerName)
|
||||
checkTracker(SecureConnTrackerName)
|
||||
@@ -711,7 +711,7 @@ suite "Switch":
|
||||
|
||||
await allFuturesThrowing(readers)
|
||||
await switch2.stop() #Otherwise this leaks
|
||||
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
checkUntilTimeout: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
|
||||
checkTracker(LPChannelTrackerName)
|
||||
checkTracker(SecureConnTrackerName)
|
||||
|
||||
@@ -36,8 +36,8 @@ suite "Yamux":
|
||||
yamuxa.close(), yamuxb.close(),
|
||||
handlera, handlerb)
|
||||
|
||||
suite "Basic":
|
||||
asyncTest "Simple test":
|
||||
suite "Simple Reading/Writing yamux messages":
|
||||
asyncTest "Roundtrip of small messages":
|
||||
mSetup()
|
||||
|
||||
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
|
||||
|
||||
Reference in New Issue
Block a user