Compare commits

...

20 Commits

Author SHA1 Message Date
Gabriel Cruz
fd4100e19c test: dummy pr 2025-05-28 15:31:41 -03:00
richΛrd
cd60b254a0 chore(version): update libp2p.nimble to 1.10.1 (#1390) 2025-05-21 07:40:11 -04:00
richΛrd
b88cdcdd4b chore: make quic optional (#1389) 2025-05-20 21:04:30 -04:00
vladopajic
4a5e06cb45 revert: disable transport interop with zig-v0.0.1 (#1372) (#1383) 2025-05-20 14:20:42 +02:00
vladopajic
fff3a7ad1f chore(hp): add timeout on dial (#1378) 2025-05-20 11:10:01 +00:00
Miran
05c894d487 fix(ci): test Nim 2.2 (#1385) 2025-05-19 15:51:56 -03:00
vladopajic
8850e9ccd9 ci(test): reduce timeout (#1376) 2025-05-19 15:34:16 +00:00
Ivan FB
2746531851 chore(dialer): capture possible exception (#1381) 2025-05-19 10:57:04 -04:00
vladopajic
2856db5490 ci(interop): disable transport interop with zig-v0.0.1 (#1372) 2025-05-15 20:04:41 +00:00
AYAHASSAN287
b29e78ccae test(gossipsub): block5 protobuf test cases (#1204)
Co-authored-by: Radoslaw Kaminski <radoslaw@status.im>
2025-05-15 16:32:03 +01:00
Gabriel Cruz
c9761c3588 chore: improve README.md text (#1373) 2025-05-15 12:35:01 +00:00
richΛrd
e4ef21e07c chore: bump quic (#1371)
Co-authored-by: Gabriel Cruz <8129788+gmelodie@users.noreply.github.com>
2025-05-14 21:06:38 +00:00
Miran
61429aa0d6 chore: fix import warnings (#1370) 2025-05-14 19:08:46 +00:00
Radosław Kamiński
c1ef011556 test(gossipsub): refactor testgossipinternal (#1366) 2025-05-14 17:15:31 +01:00
vladopajic
cd1424c09f chore(interop): use the same redis dependency (#1364) 2025-05-14 15:49:51 +00:00
Miran
878d627f93 chore: update dependencies (#1368) 2025-05-14 10:51:08 -03:00
richΛrd
1d6385ddc5 chore: bump quic (#1361)
Co-authored-by: Gabriel Cruz <8129788+gmelodie@users.noreply.github.com>
2025-05-14 11:40:13 +00:00
Gabriel Cruz
873f730b4e chore: change nim-stew dep tagging (#1362) 2025-05-13 21:46:07 -04:00
Radosław Kamiński
1c1547b137 test(gossipsub): Topic Membership Tests - updated (#1363) 2025-05-13 16:22:49 +01:00
Álex
9997f3e3d3 test(gossipsub): control message (#1191)
Co-authored-by: Radoslaw Kaminski <radoslaw@status.im>
2025-05-13 10:54:07 -04:00
38 changed files with 821 additions and 560 deletions

View File

@@ -14,7 +14,7 @@ concurrency:
jobs:
test:
timeout-minutes: 90
timeout-minutes: 40
strategy:
fail-fast: false
matrix:
@@ -36,6 +36,8 @@ jobs:
memory_management: refc
- ref: version-2-0
memory_management: refc
- ref: version-2-2
memory_management: refc
include:
- platform:
os: linux
@@ -116,5 +118,5 @@ jobs:
nimble --version
gcc --version
NIMFLAGS="${NIMFLAGS} --mm:${{ matrix.nim.memory_management }}"
export NIMFLAGS="${NIMFLAGS} -d:libp2p_quic_support --mm:${{ matrix.nim.memory_management }}"
nimble test

View File

@@ -51,7 +51,7 @@ jobs:
- name: Run test suite with coverage flags
run: |
export NIMFLAGS="--lineDir:on --passC:-fprofile-arcs --passC:-ftest-coverage --passL:-fprofile-arcs --passL:-ftest-coverage"
export NIMFLAGS="-d:libp2p_quic_support --lineDir:on --passC:-fprofile-arcs --passC:-ftest-coverage --passL:-fprofile-arcs --passL:-ftest-coverage"
nimble testnative
nimble testpubsub
nimble testfilter

View File

@@ -36,7 +36,7 @@ jobs:
test:
needs: delete_cache
timeout-minutes: 90
timeout-minutes: 40
strategy:
fail-fast: false
matrix:
@@ -97,5 +97,5 @@ jobs:
dependency_solver="legacy"
fi
NIMFLAGS="${NIMFLAGS} --mm:${{ matrix.nim.memory_management }} --solver:${dependency_solver}"
export NIMFLAGS="${NIMFLAGS} -d:libp2p_quic_support --mm:${{ matrix.nim.memory_management }} --solver:${dependency_solver}"
nimble test

26
.pinned
View File

@@ -1,19 +1,19 @@
bearssl;https://github.com/status-im/nim-bearssl@#667b40440a53a58e9f922e29e20818720c62d9ac
bearssl;https://github.com/status-im/nim-bearssl@#34d712933a4e0f91f5e66bc848594a581504a215
chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a
chronos;https://github.com/status-im/nim-chronos@#c04576d829b8a0a1b12baaa8bc92037501b3a4a0
dnsclient;https://github.com/ba0f3/dnsclient.nim@#23214235d4784d24aceed99bbfe153379ea557c8
faststreams;https://github.com/status-im/nim-faststreams@#720fc5e5c8e428d9d0af618e1e27c44b42350309
httputils;https://github.com/status-im/nim-http-utils@#3b491a40c60aad9e8d3407443f46f62511e63b18
json_serialization;https://github.com/status-im/nim-json-serialization@#85b7ea093cb85ee4f433a617b97571bd709d30df
faststreams;https://github.com/status-im/nim-faststreams@#c51315d0ae5eb2594d0bf41181d0e1aca1b3c01d
httputils;https://github.com/status-im/nim-http-utils@#79cbab1460f4c0cdde2084589d017c43a3d7b4f1
json_serialization;https://github.com/status-im/nim-json-serialization@#2b1c5eb11df3647a2cee107cd4cce3593cbb8bcf
metrics;https://github.com/status-im/nim-metrics@#6142e433fc8ea9b73379770a788017ac528d46ff
ngtcp2;https://github.com/status-im/nim-ngtcp2@#9456daa178c655bccd4a3c78ad3b8cce1f0add73
nimcrypto;https://github.com/cheatfate/nimcrypto@#1c8d6e3caf3abc572136ae9a1da81730c4eb4288
quic;https://github.com/status-im/nim-quic.git@#d54e8f0f2e454604b767fadeae243d95c30c383f
results;https://github.com/arnetheduck/nim-results@#f3c666a272c69d70cb41e7245e7f6844797303ad
secp256k1;https://github.com/status-im/nim-secp256k1@#7246d91c667f4cc3759fdd50339caa45a2ecd8be
serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe54049950e352bb969aab97173b35
stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e
testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34
unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c
nimcrypto;https://github.com/cheatfate/nimcrypto@#19c41d6be4c00b4a2c8000583bd30cf8ceb5f4b1
quic;https://github.com/status-im/nim-quic.git@#a6c30263c95fc5ddb2ef4d197c09b282555c06b0
results;https://github.com/arnetheduck/nim-results@#df8113dda4c2d74d460a8fa98252b0b771bf1f27
secp256k1;https://github.com/status-im/nim-secp256k1@#f808ed5e7a7bfc42204ec7830f14b7a42b63c284
serialization;https://github.com/status-im/nim-serialization@#548d0adc9797a10b2db7f788b804330306293088
stew;https://github.com/status-im/nim-stew@#0db179256cf98eb9ce9ee7b9bc939f219e621f77
testutils;https://github.com/status-im/nim-testutils@#9e842bd58420d23044bc55e16088e8abbe93ce51
unittest2;https://github.com/status-im/nim-unittest2@#8b51e99b4a57fcfb31689230e75595f024543024
websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982
zlib;https://github.com/status-im/nim-zlib@#38b72eda9d70067df4a953f56b5ed59630f2a17b
zlib;https://github.com/status-im/nim-zlib@#daa8723fd32299d4ca621c837430c29a5a11e19a

View File

@@ -20,7 +20,7 @@
- [Background](#background)
- [Install](#install)
- [Getting Started](#getting-started)
- [Go-libp2p-daemon](#go-libp2p-daemon)
- [Testing](#testing)
- [Modules](#modules)
- [Users](#users)
- [Stability](#stability)
@@ -33,22 +33,22 @@
## Background
libp2p is a [Peer-to-Peer](https://en.wikipedia.org/wiki/Peer-to-peer) networking stack, with [implementations](https://github.com/libp2p/libp2p#implementations) in multiple languages derived from the same [specifications.](https://github.com/libp2p/specs)
Building large scale peer-to-peer systems has been complex and difficult in the last 15 years and libp2p is a way to fix that. It's striving to be a modular stack, with sane and secure defaults, useful protocols, while remain open and extensible.
This implementation in native Nim, relying on [chronos](https://github.com/status-im/nim-chronos) for async. It's used in production by a few [projects](#users)
Building large scale peer-to-peer systems has been complex and difficult in the last 15 years and libp2p is a way to fix that. It strives to be a modular stack with secure defaults and useful protocols, while remaining open and extensible.
This is a native Nim implementation, using [chronos](https://github.com/status-im/nim-chronos) for asynchronous execution. It's used in production by a few [projects](#users)
Learn more about libp2p at [**libp2p.io**](https://libp2p.io) and follow libp2p's documentation [**docs.libp2p.io**](https://docs.libp2p.io).
## Install
**Prerequisite**
- [Nim](https://nim-lang.org/install.html)
> The currently supported Nim version is 1.6.18.
> The currently supported Nim versions are 1.6, 2.0 and 2.2.
```
nimble install libp2p
```
## Getting Started
You'll find the nim-libp2p documentation [here](https://vacp2p.github.io/nim-libp2p/docs/).
You'll find the nim-libp2p documentation [here](https://vacp2p.github.io/nim-libp2p/docs/). See [examples](./examples) for simple usage patterns.
### Testing
Remember you'll need to build the `go-libp2p-daemon` binary to run the `nim-libp2p` tests.
@@ -80,10 +80,10 @@ List of packages modules implemented in nim-libp2p:
| [libp2p-yamux](libp2p/muxers/yamux/yamux.nim) | [Yamux](https://docs.libp2p.io/concepts/multiplex/yamux/) multiplexer |
| **Data Types** | |
| [peer-id](libp2p/peerid.nim) | [Cryptographic identifiers](https://docs.libp2p.io/concepts/fundamentals/peers/#peer-id) |
| [peer-store](libp2p/peerstore.nim) | ["Address book" of known peers](https://docs.libp2p.io/concepts/fundamentals/peers/#peer-store) |
| [peer-store](libp2p/peerstore.nim) | [Address book of known peers](https://docs.libp2p.io/concepts/fundamentals/peers/#peer-store) |
| [multiaddress](libp2p/multiaddress.nim) | [Composable network addresses](https://github.com/multiformats/multiaddr) |
| [signed envelope](libp2p/signed_envelope.nim) | [Signed generic data container](https://github.com/libp2p/specs/blob/master/RFC/0002-signed-envelopes.md) |
| [routing record](libp2p/routing_record.nim) | [Signed peer dialing informations](https://github.com/libp2p/specs/blob/master/RFC/0003-routing-records.md) |
| [signed-envelope](libp2p/signed_envelope.nim) | [Signed generic data container](https://github.com/libp2p/specs/blob/master/RFC/0002-signed-envelopes.md) |
| [routing-record](libp2p/routing_record.nim) | [Signed peer dialing informations](https://github.com/libp2p/specs/blob/master/RFC/0003-routing-records.md) |
| [discovery manager](libp2p/discovery/discoverymngr.nim) | Discovery Manager |
| **Utilities** | |
| [libp2p-crypto](libp2p/crypto) | Cryptographic backend |
@@ -156,6 +156,11 @@ The code follows the [Status Nim Style Guide](https://status-im.github.io/nim-st
### Compile time flags
Enable quic transport support
```bash
nim c -d:libp2p_quic_support some_file.nim
```
Enable expensive metrics (ie, metrics with per-peer cardinality):
```bash
nim c -d:libp2p_expensive_metrics some_file.nim

View File

@@ -52,7 +52,6 @@ else:
stream/connection,
transports/transport,
transports/tcptransport,
transports/quictransport,
protocols/secure/noise,
cid,
multihash,
@@ -71,3 +70,7 @@ else:
minprotobuf, switch, peerid, peerinfo, connection, multiaddress, crypto, lpstream,
bufferstream, muxer, mplex, transport, tcptransport, noise, errors, cid, multihash,
multicodec, builders, pubsub
when defined(libp2p_quic_support):
import libp2p/transports/quictransport
export quictransport

View File

@@ -1,17 +1,18 @@
mode = ScriptMode.Verbose
packageName = "libp2p"
version = "1.10.0"
version = "1.10.1"
author = "Status Research & Development GmbH"
description = "LibP2P implementation"
license = "MIT"
skipDirs = @["tests", "examples", "Nim", "tools", "scripts", "docs"]
# test
requires "nim >= 1.6.0",
"nimcrypto >= 0.6.0 & < 0.7.0", "dnsclient >= 0.3.0 & < 0.4.0", "bearssl >= 0.2.5",
"chronicles >= 0.10.2", "chronos >= 4.0.3", "metrics", "secp256k1", "stew#head",
"chronicles >= 0.10.2", "chronos >= 4.0.3", "metrics", "secp256k1", "stew >= 0.4.0",
"websock", "unittest2", "results",
"https://github.com/status-im/nim-quic.git#d54e8f0f2e454604b767fadeae243d95c30c383f"
"https://github.com/status-im/nim-quic.git#a6c30263c95fc5ddb2ef4d197c09b282555c06b0"
let nimc = getEnv("NIMC", "nim") # Which nim compiler to use
let lang = getEnv("NIMLANG", "c") # Which backend (c/cpp/js)

View File

@@ -23,7 +23,7 @@ import
stream/connection,
multiaddress,
crypto/crypto,
transports/[transport, tcptransport, quictransport, memorytransport],
transports/[transport, tcptransport, memorytransport],
muxers/[muxer, mplex/mplex, yamux/yamux],
protocols/[identify, secure/secure, secure/noise, rendezvous],
protocols/connectivity/[autonat/server, relay/relay, relay/client, relay/rtransport],
@@ -169,11 +169,14 @@ proc withTcpTransport*(
TcpTransport.new(flags, upgr)
)
proc withQuicTransport*(b: SwitchBuilder): SwitchBuilder {.public.} =
b.withTransport(
proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
QuicTransport.new(upgr, privateKey)
)
when defined(libp2p_quic_support):
import transports/quictransport
proc withQuicTransport*(b: SwitchBuilder): SwitchBuilder {.public.} =
b.withTransport(
proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
QuicTransport.new(upgr, privateKey)
)
proc withMemoryTransport*(b: SwitchBuilder): SwitchBuilder {.public.} =
b.withTransport(

View File

@@ -124,9 +124,13 @@ proc expandDnsAddr(
for resolvedAddress in resolved:
let lastPart = resolvedAddress[^1].tryGet()
if lastPart.protoCode == Result[MultiCodec, string].ok(multiCodec("p2p")):
let
var peerIdBytes: seq[byte]
try:
peerIdBytes = lastPart.protoArgument().tryGet()
addrPeerId = PeerId.init(peerIdBytes).tryGet()
except ResultError[string]:
raiseAssert "expandDnsAddr failed in protoArgument: " & getCurrentExceptionMsg()
let addrPeerId = PeerId.init(peerIdBytes).tryGet()
result.add((resolvedAddress[0 ..^ 2].tryGet(), Opt.some(addrPeerId)))
else:
result.add((resolvedAddress, peerId))

View File

@@ -9,7 +9,7 @@
{.push raises: [].}
import stew/results
import results
import chronos, chronicles
import ../../../switch, ../../../multiaddress, ../../../peerid
import core

View File

@@ -11,7 +11,7 @@
import std/sequtils
import stew/results
import results
import chronos, chronicles
import core

View File

@@ -10,8 +10,8 @@
{.push raises: [].}
import std/[sets, sequtils]
import stew/[results, objects]
import chronos, chronicles
import stew/objects
import results, chronos, chronicles
import core
import

View File

@@ -29,7 +29,7 @@ import
../../utility,
../../switch
import stew/results
import results
export results
import ./gossipsub/[types, scoring, behavior], ../../utils/heartbeat

View File

@@ -31,7 +31,7 @@ import
../../errors,
../../utility
import stew/results
import results
export results
export tables, sets

View File

@@ -9,8 +9,8 @@
{.push raises: [].}
import std/[sequtils, strutils, tables, hashes, options, sets, deques]
import stew/results
import std/[sequtils, tables, hashes, options, sets, deques]
import results
import chronos, chronicles, nimcrypto/sha2, metrics
import chronos/ratelimit
import

View File

@@ -10,7 +10,7 @@
{.push raises: [].}
import std/[hashes, sets]
import chronos/timer, stew/results
import chronos/timer, results
import ../../utility

View File

@@ -12,7 +12,7 @@
{.push raises: [].}
import std/[sequtils]
import stew/results
import results
import chronos, chronicles
import
transport,

View File

@@ -1,4 +1,3 @@
import options, tables
import chronos, chronicles, stew/byteutils
import helpers
import ../libp2p

View File

@@ -7,7 +7,7 @@ COPY .pinned libp2p.nimble nim-libp2p/
RUN --mount=type=cache,target=/var/cache/apt apt-get update && apt-get install -y libssl-dev
RUN cd nim-libp2p && nimble install_pinned && nimble install redis -y
RUN cd nim-libp2p && nimble install_pinned && nimble install "redis@#b341fe240dbf11c544011dd0e033d3c3acca56af" -y
COPY . nim-libp2p/

View File

@@ -18,6 +18,9 @@ import
import ../stubs/autonatclientstub
import ../errorhelpers
logScope:
topics = "hp interop node"
proc createSwitch(r: Relay = nil, hpService: Service = nil): Switch =
let rng = newRng()
var builder = SwitchBuilder
@@ -41,86 +44,96 @@ proc createSwitch(r: Relay = nil, hpService: Service = nil): Switch =
return s
proc main() {.async.} =
let relayClient = RelayClient.new()
let autoRelayService = AutoRelayService.new(1, relayClient, nil, newRng())
let autonatClientStub = AutonatClientStub.new(expectedDials = 1)
autonatClientStub.answer = NotReachable
let autonatService = AutonatService.new(autonatClientStub, newRng(), maxQueueSize = 1)
let hpservice = HPService.new(autonatService, autoRelayService)
let
isListener = getEnv("MODE") == "listen"
switch = createSwitch(relayClient, hpservice)
auxSwitch = createSwitch()
redisClient = open("redis", 6379.Port)
debug "Connected to redis"
await switch.start()
await auxSwitch.start()
let relayAddr =
try:
redisClient.bLPop(@["RELAY_TCP_ADDRESS"], 0)
except Exception as e:
raise newException(CatchableError, e.msg)
debug "All relay addresses", relayAddr
# This is necessary to make the autonat service work. It will ask this peer for our reachability which the autonat
# client stub will answer NotReachable.
await switch.connect(auxSwitch.peerInfo.peerId, auxSwitch.peerInfo.addrs)
# Wait for autonat to be NotReachable
while autonatService.networkReachability != NetworkReachability.NotReachable:
await sleepAsync(100.milliseconds)
# This will trigger the autonat relay service to make a reservation.
let relayMA = MultiAddress.init(relayAddr[1]).tryGet()
try:
let relayClient = RelayClient.new()
let autoRelayService = AutoRelayService.new(1, relayClient, nil, newRng())
let autonatClientStub = AutonatClientStub.new(expectedDials = 1)
autonatClientStub.answer = NotReachable
let autonatService =
AutonatService.new(autonatClientStub, newRng(), maxQueueSize = 1)
let hpservice = HPService.new(autonatService, autoRelayService)
debug "Dialing relay...", relayMA
let relayId = await switch.connect(relayMA).wait(30.seconds)
debug "Connected to relay", relayId
except AsyncTimeoutError:
raise newException(CatchableError, "Connection to relay timed out")
let
isListener = getEnv("MODE") == "listen"
switch = createSwitch(relayClient, hpservice)
auxSwitch = createSwitch()
redisClient = open("redis", 6379.Port)
# Wait for our relay address to be published
while not switch.peerInfo.addrs.anyIt(it.contains(multiCodec("p2p-circuit")).tryGet()):
await sleepAsync(100.milliseconds)
debug "Connected to redis"
if isListener:
let listenerPeerId = switch.peerInfo.peerId
discard redisClient.rPush("LISTEN_CLIENT_PEER_ID", $listenerPeerId)
debug "Pushed listener client peer id to redis", listenerPeerId
await switch.start()
await auxSwitch.start()
let relayAddr =
# Nothing to do anymore, wait to be killed
await sleepAsync(2.minutes)
else:
let listenerId =
try:
redisClient.bLPop(@["RELAY_TCP_ADDRESS"], 0)
PeerId.init(redisClient.bLPop(@["LISTEN_CLIENT_PEER_ID"], 0)[1]).tryGet()
except Exception as e:
raise newException(CatchableError, e.msg)
# This is necessary to make the autonat service work. It will ask this peer for our reachability which the autonat
# client stub will answer NotReachable.
await switch.connect(auxSwitch.peerInfo.peerId, auxSwitch.peerInfo.addrs)
debug "Got listener peer id", listenerId
let listenerRelayAddr = MultiAddress.init($relayMA & "/p2p-circuit").tryGet()
# Wait for autonat to be NotReachable
while autonatService.networkReachability != NetworkReachability.NotReachable:
await sleepAsync(100.milliseconds)
debug "Dialing listener relay address", listenerRelayAddr
await switch.connect(listenerId, @[listenerRelayAddr])
# This will trigger the autonat relay service to make a reservation.
let relayMA = MultiAddress.init(relayAddr[1]).tryGet()
debug "Got relay address", relayMA
let relayId = await switch.connect(relayMA)
debug "Connected to relay", relayId
# wait for hole-punching to complete in the background
await sleepAsync(5000.milliseconds)
# Wait for our relay address to be published
while not switch.peerInfo.addrs.anyIt(
it.contains(multiCodec("p2p-circuit")).tryGet()
let conn = switch.connManager.selectMuxer(listenerId).connection
let channel = await switch.dial(listenerId, @[listenerRelayAddr], PingCodec)
let delay = await Ping.new().ping(channel)
await allFuturesThrowing(
channel.close(), conn.close(), switch.stop(), auxSwitch.stop()
)
:
await sleepAsync(100.milliseconds)
echo &"""{{"rtt_to_holepunched_peer_millis":{delay.millis}}}"""
if isListener:
let listenerPeerId = switch.peerInfo.peerId
discard redisClient.rPush("LISTEN_CLIENT_PEER_ID", $listenerPeerId)
debug "Pushed listener client peer id to redis", listenerPeerId
try:
proc mainAsync(): Future[string] {.async.} =
# mainAsync wraps main and returns some value, as otherwise
# 'waitFor(fut)' has no type (or is ambiguous)
await main()
return "done"
# Nothing to do anymore, wait to be killed
await sleepAsync(2.minutes)
else:
let listenerId =
try:
PeerId.init(redisClient.bLPop(@["LISTEN_CLIENT_PEER_ID"], 0)[1]).tryGet()
except Exception as e:
raise newException(CatchableError, e.msg)
debug "Got listener peer id", listenerId
let listenerRelayAddr = MultiAddress.init($relayMA & "/p2p-circuit").tryGet()
debug "Dialing listener relay address", listenerRelayAddr
await switch.connect(listenerId, @[listenerRelayAddr])
# wait for hole-punching to complete in the background
await sleepAsync(5000.milliseconds)
let conn = switch.connManager.selectMuxer(listenerId).connection
let channel = await switch.dial(listenerId, @[listenerRelayAddr], PingCodec)
let delay = await Ping.new().ping(channel)
await allFuturesThrowing(
channel.close(), conn.close(), switch.stop(), auxSwitch.stop()
)
echo &"""{{"rtt_to_holepunched_peer_millis":{delay.millis}}}"""
quit(0)
except CatchableError as e:
error "Unexpected error", description = e.msg
discard waitFor(main().withTimeout(4.minutes))
quit(1)
discard waitFor(mainAsync().wait(4.minutes))
except AsyncTimeoutError:
error "Program execution timed out."
quit(-1)
except CatchableError as e:
error "Unexpected error", description = e.msg
quit(-1)

View File

@@ -9,12 +9,11 @@
{.used.}
import sequtils, options, tables, sets
import sequtils, tables, sets
import chronos, stew/byteutils
import
utils,
../../libp2p/[
errors,
switch,
stream/connection,
crypto/crypto,

View File

@@ -22,88 +22,44 @@ suite "GossipSub Fanout Management":
checkTrackers()
asyncTest "`replenishFanout` Degree Lo":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
discard
let topic = "foobar"
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()
var conns = newSeq[Connection]()
for i in 0 ..< 15:
let conn = TestBufferStream.new(noop)
conns &= conn
var peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler
gossipSub.gossipsub[topic].incl(peer)
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
defer:
await teardownGossipSub(gossipSub, conns)
check gossipSub.gossipsub[topic].len == 15
gossipSub.replenishFanout(topic)
check gossipSub.fanout[topic].len == gossipSub.parameters.d
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "`dropFanoutPeers` drop expired fanout topics":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
discard
let topic = "foobar"
gossipSub.topicParams[topic] = TopicParams.init()
gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(1.millis)
await sleepAsync(5.millis) # allow the topic to expire
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(6, topic, populateGossipsub = true, populateFanout = true)
defer:
await teardownGossipSub(gossipSub, conns)
var conns = newSeq[Connection]()
for i in 0 ..< 6:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler
gossipSub.fanout[topic].incl(peer)
gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(1.millis)
await sleepAsync(5.millis) # allow the topic to expire
check gossipSub.fanout[topic].len == gossipSub.parameters.d
gossipSub.dropFanoutPeers()
check topic notin gossipSub.fanout
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "`dropFanoutPeers` leave unexpired fanout topics":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let
topic1 = "foobar1"
topic2 = "foobar2"
let (gossipSub, conns, peers) = setupGossipSubWithPeers(
6, @[topic1, topic2], populateGossipsub = true, populateFanout = true
)
defer:
await teardownGossipSub(gossipSub, conns)
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
discard
let topic1 = "foobar1"
let topic2 = "foobar2"
gossipSub.topicParams[topic1] = TopicParams.init()
gossipSub.topicParams[topic2] = TopicParams.init()
gossipSub.fanout[topic1] = initHashSet[PubSubPeer]()
gossipSub.fanout[topic2] = initHashSet[PubSubPeer]()
gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(1.millis)
gossipSub.lastFanoutPubSub[topic2] = Moment.fromNow(1.minutes)
await sleepAsync(5.millis) # allow the topic to expire
var conns = newSeq[Connection]()
for i in 0 ..< 6:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler
gossipSub.fanout[topic1].incl(peer)
gossipSub.fanout[topic2].incl(peer)
await sleepAsync(5.millis) # allow first topic to expire
check gossipSub.fanout[topic1].len == gossipSub.parameters.d
check gossipSub.fanout[topic2].len == gossipSub.parameters.d
@@ -112,14 +68,8 @@ suite "GossipSub Fanout Management":
check topic1 notin gossipSub.fanout
check topic2 in gossipSub.fanout
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "e2e - GossipSub send over fanout A -> B":
var passed = newFuture[void]()
proc handler(topic: string, data: seq[byte]) {.async.} =
check topic == "foobar"
passed.complete()
let (passed, handler) = createCompleteHandler()
let nodes = generateNodes(2, gossip = true)
@@ -153,15 +103,12 @@ suite "GossipSub Fanout Management":
gossip1.fanout.hasPeerId("foobar", gossip2.peerInfo.peerId)
not gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId)
await passed.wait(2.seconds)
discard await passed.wait(2.seconds)
check observed == 2
asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic":
var passed = newFuture[void]()
proc handler(topic: string, data: seq[byte]) {.async.} =
check topic == "foobar"
passed.complete()
let (passed, handler) = createCompleteHandler()
let nodes = generateNodes(2, gossip = true, unsubscribeBackoff = 10.minutes)
@@ -191,6 +138,6 @@ suite "GossipSub Fanout Management":
GossipSub(nodes[0]).fanout.getOrDefault("foobar").len > 0
GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0
await passed.wait(2.seconds)
discard await passed.wait(2.seconds)
trace "test done, stopping..."

View File

@@ -24,26 +24,14 @@ suite "GossipSub Gossip Protocol":
checkTrackers()
asyncTest "`getGossipPeers` - should gather up to degree D non intersecting peers":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
discard
let topic = "foobar"
gossipSub.topicParams[topic] = TopicParams.init()
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
var conns = newSeq[Connection]()
let (gossipSub, conns, peers) = setupGossipSubWithPeers(45, topic)
defer:
await teardownGossipSub(gossipSub, conns)
# generate mesh and fanout peers
# generate mesh and fanout peers
for i in 0 ..< 30:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler
let peer = peers[i]
if i mod 2 == 0:
gossipSub.fanout[topic].incl(peer)
else:
@@ -51,57 +39,36 @@ suite "GossipSub Gossip Protocol":
gossipSub.mesh[topic].incl(peer)
# generate gossipsub (free standing) peers
for i in 0 ..< 15:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler
for i in 30 ..< 45:
let peer = peers[i]
gossipSub.gossipsub[topic].incl(peer)
# generate messages
var seqno = 0'u64
for i in 0 .. 5:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let conn = conns[i]
inc seqno
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
check gossipSub.fanout[topic].len == 15
check gossipSub.mesh[topic].len == 15
check gossipSub.gossipsub[topic].len == 15
let peers = gossipSub.getGossipPeers()
check peers.len == gossipSub.parameters.d
for p in peers.keys:
let gossipPeers = gossipSub.getGossipPeers()
check gossipPeers.len == gossipSub.parameters.d
for p in gossipPeers.keys:
check not gossipSub.fanout.hasPeerId(topic, p.peerId)
check not gossipSub.mesh.hasPeerId(topic, p.peerId)
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "`getGossipPeers` - should not crash on missing topics in mesh":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
discard
let topic = "foobar"
gossipSub.topicParams[topic] = TopicParams.init()
gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
var conns = newSeq[Connection]()
for i in 0 ..< 30:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler
let (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic)
defer:
await teardownGossipSub(gossipSub, conns)
# generate mesh and fanout peers
for i, peer in peers:
if i mod 2 == 0:
gossipSub.fanout[topic].incl(peer)
else:
@@ -110,38 +77,22 @@ suite "GossipSub Gossip Protocol":
# generate messages
var seqno = 0'u64
for i in 0 .. 5:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let conn = conns[i]
inc seqno
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
let peers = gossipSub.getGossipPeers()
check peers.len == gossipSub.parameters.d
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
let gossipPeers = gossipSub.getGossipPeers()
check gossipPeers.len == gossipSub.parameters.d
asyncTest "`getGossipPeers` - should not crash on missing topics in fanout":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
discard
let topic = "foobar"
gossipSub.topicParams[topic] = TopicParams.init()
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
var conns = newSeq[Connection]()
for i in 0 ..< 30:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler
let (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic)
defer:
await teardownGossipSub(gossipSub, conns)
# generate mesh and fanout peers
for i, peer in peers:
if i mod 2 == 0:
gossipSub.mesh[topic].incl(peer)
gossipSub.grafted(peer, topic)
@@ -151,38 +102,22 @@ suite "GossipSub Gossip Protocol":
# generate messages
var seqno = 0'u64
for i in 0 .. 5:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let conn = conns[i]
inc seqno
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
let peers = gossipSub.getGossipPeers()
check peers.len == gossipSub.parameters.d
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
let gossipPeers = gossipSub.getGossipPeers()
check gossipPeers.len == gossipSub.parameters.d
asyncTest "`getGossipPeers` - should not crash on missing topics in gossip":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
discard
let topic = "foobar"
gossipSub.topicParams[topic] = TopicParams.init()
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
var conns = newSeq[Connection]()
for i in 0 ..< 30:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler
let (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic)
defer:
await teardownGossipSub(gossipSub, conns)
# generate mesh and fanout peers
for i, peer in peers:
if i mod 2 == 0:
gossipSub.mesh[topic].incl(peer)
gossipSub.grafted(peer, topic)
@@ -192,47 +127,22 @@ suite "GossipSub Gossip Protocol":
# generate messages
var seqno = 0'u64
for i in 0 .. 5:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let conn = conns[i]
inc seqno
let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno))
let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
let peers = gossipSub.getGossipPeers()
check peers.len == 0
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
let gossipPeers = gossipSub.getGossipPeers()
check gossipPeers.len == 0
asyncTest "handleIHave/Iwant tests":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
check false
proc handler2(topic: string, data: seq[byte]) {.async.} =
discard
let topic = "foobar"
var conns = newSeq[Connection]()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.subscribe(topic, handler2)
var (gossipSub, conns, peers) =
setupGossipSubWithPeers(30, topic, populateMesh = true)
defer:
await teardownGossipSub(gossipSub, conns)
# Instantiates 30 peers and connects all of them to the previously defined `gossipSub`
for i in 0 ..< 30:
# Define a new connection
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler
# Add the connection to `gossipSub`, to their `gossipSub.gossipsub` and `gossipSub.mesh` tables
gossipSub.grafted(peer, topic)
gossipSub.mesh[topic].incl(peer)
gossipSub.subscribe(topic, voidTopicHandler)
# Peers with no budget should not request messages
block:
@@ -296,9 +206,6 @@ suite "GossipSub Gossip Protocol":
check gossipSub.mcache.msgs.len == 1
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "messages sent to peers not in the mesh are propagated via gossip":
let
numberOfNodes = 5
@@ -715,3 +622,130 @@ suite "GossipSub Gossip Protocol":
gossip0.mesh.getOrDefault("foobar").toSeq[0].codec == GossipSubCodec_10
checkUntilTimeout:
gossip1.mesh.getOrDefault("foobar").toSeq[0].codec == GossipSubCodec_10
asyncTest "IHAVE messages correctly advertise message ID to peers":
# Given 2 nodes
let
topic = "foo"
messageID = @[0'u8, 1, 2, 3]
ihaveMessage =
ControlMessage(ihave: @[ControlIHave(topicID: topic, messageIDs: @[messageID])])
numberOfNodes = 2
nodes = generateNodes(numberOfNodes, gossip = true, verifySignature = false)
.toGossipSub()
n0 = nodes[0]
n1 = nodes[1]
startNodesAndDeferStop(nodes)
# Given node1 has an IHAVE observer
var receivedIHave = newFuture[(string, seq[MessageId])]()
let checkForIhaves = proc(peer: PubSubPeer, msgs: var RPCMsg) =
if msgs.control.isSome:
for msg in msgs.control.get.ihave:
receivedIHave.complete((msg.topicID, msg.messageIDs))
n1.addObserver(PubSubObserver(onRecv: checkForIhaves))
# And the nodes are connected
await connectNodesStar(nodes)
# And both subscribe to the topic
subscribeAllNodes(nodes, topic, voidTopicHandler)
await waitForHeartbeat()
check:
n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
# When an IHAVE message is sent
let p1 = n0.getOrCreatePeer(n1.peerInfo.peerId, @[GossipSubCodec_12])
n0.broadcast(@[p1], RPCMsg(control: some(ihaveMessage)), isHighPriority = false)
await waitForHeartbeat()
# Then the peer has the message ID
let r = await receivedIHave.waitForState(HEARTBEAT_TIMEOUT)
check:
r.isCompleted((topic, @[messageID]))
asyncTest "IWANT messages correctly request messages by their IDs":
# Given 2 nodes
let
topic = "foo"
messageID = @[0'u8, 1, 2, 3]
iwantMessage = ControlMessage(iwant: @[ControlIWant(messageIDs: @[messageID])])
numberOfNodes = 2
nodes = generateNodes(numberOfNodes, gossip = true, verifySignature = false)
.toGossipSub()
n0 = nodes[0]
n1 = nodes[1]
startNodesAndDeferStop(nodes)
# Given node1 has an IWANT observer
var receivedIWant = newFuture[seq[MessageId]]()
let checkForIwants = proc(peer: PubSubPeer, msgs: var RPCMsg) =
if msgs.control.isSome:
for msg in msgs.control.get.iwant:
receivedIWant.complete(msg.messageIDs)
n1.addObserver(PubSubObserver(onRecv: checkForIwants))
# And the nodes are connected
await connectNodesStar(nodes)
# And both subscribe to the topic
subscribeAllNodes(nodes, topic, voidTopicHandler)
await waitForHeartbeat()
check:
n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
# When an IWANT message is sent
let p1 = n0.getOrCreatePeer(n1.peerInfo.peerId, @[GossipSubCodec_12])
n0.broadcast(@[p1], RPCMsg(control: some(iwantMessage)), isHighPriority = false)
await waitForHeartbeat()
# Then the peer has the message ID
let r = await receivedIWant.waitForState(HEARTBEAT_TIMEOUT)
check:
r.isCompleted(@[messageID])
asyncTest "IHAVE for message not held by peer triggers IWANT response to sender":
# Given 2 nodes
let
topic = "foo"
messageID = @[0'u8, 1, 2, 3]
ihaveMessage =
ControlMessage(ihave: @[ControlIHave(topicID: topic, messageIDs: @[messageID])])
numberOfNodes = 2
nodes = generateNodes(numberOfNodes, gossip = true, verifySignature = false)
.toGossipSub()
n0 = nodes[0]
n1 = nodes[1]
startNodesAndDeferStop(nodes)
# Given node1 has an IWANT observer
var receivedIWant = newFuture[seq[MessageId]]()
let checkForIwants = proc(peer: PubSubPeer, msgs: var RPCMsg) =
if msgs.control.isSome:
for msg in msgs.control.get.iwant:
receivedIWant.complete(msg.messageIDs)
n0.addObserver(PubSubObserver(onRecv: checkForIwants))
# And the nodes are connected
await connectNodesStar(nodes)
# And both nodes subscribe to the topic
subscribeAllNodes(nodes, topic, voidTopicHandler)
await waitForHeartbeat()
# When an IHAVE message is sent from node0
let p1 = n0.getOrCreatePeer(n1.peerInfo.peerId, @[GossipSubCodec_12])
n0.broadcast(@[p1], RPCMsg(control: some(ihaveMessage)), isHighPriority = false)
await waitForHeartbeat()
# Then node0 should receive an IWANT message from node1 (as node1 doesn't have the message)
let iWantResult = await receivedIWant.waitForState(HEARTBEAT_TIMEOUT)
check:
iWantResult.isCompleted(@[messageID])

View File

@@ -25,28 +25,14 @@ suite "GossipSub Mesh Management":
params.validateParameters().tryGet()
asyncTest "subscribe/unsubscribeAll":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(topic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} =
discard
let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()
var conns = newSeq[Connection]()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
for i in 0 ..< 15:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.sendConn = conn
gossipSub.gossipsub[topic].incl(peer)
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
defer:
await teardownGossipSub(gossipSub, conns)
# test via dynamic dispatch
gossipSub.PubSub.subscribe(topic, handler)
gossipSub.PubSub.subscribe(topic, voidTopicHandler)
check:
gossipSub.topics.contains(topic)
@@ -61,53 +47,27 @@ suite "GossipSub Mesh Management":
topic notin gossipSub.mesh # not in mesh
topic in gossipSub.gossipsub # but still in gossipsub table (for fanning out)
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "`rebalanceMesh` Degree Lo":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()
var conns = newSeq[Connection]()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
for i in 0 ..< 15:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.sendConn = conn
gossipSub.gossipsub[topic].incl(peer)
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
defer:
await teardownGossipSub(gossipSub, conns)
check gossipSub.peers.len == 15
gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len == gossipSub.parameters.d
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "rebalanceMesh - bad peers":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
defer:
await teardownGossipSub(gossipSub, conns)
var conns = newSeq[Connection]()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
var scoreLow = -11'f64
for i in 0 ..< 15:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.sendConn = conn
for peer in peers:
peer.score = scoreLow
gossipSub.gossipsub[topic].incl(peer)
scoreLow += 1.0
check gossipSub.peers.len == 15
@@ -117,54 +77,28 @@ suite "GossipSub Mesh Management":
for peer in gossipSub.mesh[topic]:
check peer.score >= 0.0
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "`rebalanceMesh` Degree Hi":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()
var conns = newSeq[Connection]()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
for i in 0 ..< 15:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
gossipSub.grafted(peer, topic)
gossipSub.mesh[topic].incl(peer)
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true, populateMesh = true)
defer:
await teardownGossipSub(gossipSub, conns)
check gossipSub.mesh[topic].len == 15
gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len ==
gossipSub.parameters.d + gossipSub.parameters.dScore
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "rebalanceMesh fail due to backoff":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()
var conns = newSeq[Connection]()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
for i in 0 ..< 15:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.sendConn = conn
gossipSub.gossipsub[topic].incl(peer)
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
defer:
await teardownGossipSub(gossipSub, conns)
for peer in peers:
gossipSub.backingOff.mgetOrPut(topic, initTable[PeerId, Moment]()).add(
peerId, Moment.now() + 1.hours
peer.peerId, Moment.now() + 1.hours
)
let prunes = gossipSub.handleGraft(peer, @[ControlGraft(topicID: topic)])
# there must be a control prune due to violation of backoff
@@ -175,34 +109,18 @@ suite "GossipSub Mesh Management":
# expect 0 since they are all backing off
check gossipSub.mesh[topic].len == 0
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "rebalanceMesh fail due to backoff - remote":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()
var conns = newSeq[Connection]()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
for i in 0 ..< 15:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.sendConn = conn
gossipSub.gossipsub[topic].incl(peer)
gossipSub.mesh[topic].incl(peer)
let (gossipSub, conns, peers) =
setupGossipSubWithPeers(15, topic, populateGossipsub = true, populateMesh = true)
defer:
await teardownGossipSub(gossipSub, conns)
check gossipSub.peers.len == 15
gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len != 0
for i in 0 ..< 15:
let peerId = conns[i].peerId
let peer = gossipSub.getPubSubPeer(peerId)
for peer in peers:
gossipSub.handlePrune(
peer,
@[
@@ -217,45 +135,36 @@ suite "GossipSub Mesh Management":
# expect topic cleaned up since they are all pruned
check topic notin gossipSub.mesh
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "rebalanceMesh Degree Hi - audit scenario":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()
let
topic = "foobar"
numInPeers = 6
numOutPeers = 7
totalPeers = numInPeers + numOutPeers
let (gossipSub, conns, peers) = setupGossipSubWithPeers(
totalPeers, topic, populateGossipsub = true, populateMesh = true
)
defer:
await teardownGossipSub(gossipSub, conns)
gossipSub.parameters.dScore = 4
gossipSub.parameters.d = 6
gossipSub.parameters.dOut = 3
gossipSub.parameters.dHigh = 12
gossipSub.parameters.dLow = 4
var conns = newSeq[Connection]()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
for i in 0 ..< 6:
let conn = TestBufferStream.new(noop)
for i in 0 ..< numInPeers:
let conn = conns[i]
let peer = peers[i]
conn.transportDir = Direction.In
conns &= conn
let peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.score = 40.0
peer.sendConn = conn
gossipSub.grafted(peer, topic)
gossipSub.mesh[topic].incl(peer)
for i in 0 ..< 7:
let conn = TestBufferStream.new(noop)
for i in numInPeers ..< totalPeers:
let conn = conns[i]
let peer = peers[i]
conn.transportDir = Direction.Out
conns &= conn
let peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.score = 10.0
peer.sendConn = conn
gossipSub.grafted(peer, topic)
gossipSub.mesh[topic].incl(peer)
check gossipSub.mesh[topic].len == 13
gossipSub.rebalanceMesh(topic)
@@ -268,9 +177,6 @@ suite "GossipSub Mesh Management":
# ensure we give priority and keep at least dOut outbound peers
check outbound >= gossipSub.parameters.dOut
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "dont prune peers if mesh len is less than d_high":
let
numberOfNodes = 5
@@ -520,3 +426,264 @@ suite "GossipSub Mesh Management":
nodes[1].subscribe("foobar", handler)
await invalidDetected.wait(10.seconds)
asyncTest "mesh and gossipsub updated when topic subscribed and unsubscribed":
let
numberOfNodes = 5
topic = "foobar"
nodes = generateNodes(numberOfNodes, gossip = true).toGossipSub()
startNodesAndDeferStop(nodes)
# When all of them are connected and subscribed to the same topic
await connectNodesStar(nodes)
subscribeAllNodes(nodes, topic, voidTopicHandler)
await waitForHeartbeat()
# Then mesh and gossipsub should be populated
for node in nodes:
check node.topics.contains(topic)
check node.gossipsub.hasKey(topic)
check node.gossipsub[topic].len() == numberOfNodes - 1
check node.mesh.hasKey(topic)
check node.mesh[topic].len() == numberOfNodes - 1
# When all nodes unsubscribe from the topic
unsubscribeAllNodes(nodes, topic, voidTopicHandler)
await waitForHeartbeat()
# Then the topic should be removed from mesh and gossipsub
for node in nodes:
check topic notin node.topics
check topic notin node.mesh
check topic notin node.gossipsub
asyncTest "handle subscribe and unsubscribe for multiple topics":
let
numberOfNodes = 3
topics = @["foobar1", "foobar2", "foobar3"]
nodes = generateNodes(numberOfNodes, gossip = true).toGossipSub()
startNodesAndDeferStop(nodes)
# When nodes subscribe to multiple topics
await connectNodesStar(nodes)
for topic in topics:
subscribeAllNodes(nodes, topic, voidTopicHandler)
await waitForHeartbeat()
# Then all nodes should be subscribed to the topics initially
for node in nodes:
for topic in topics:
check node.topics.contains(topic)
check node.gossipsub[topic].len() == numberOfNodes - 1
check node.mesh[topic].len() == numberOfNodes - 1
# When they unsubscribe from all topics
for topic in topics:
unsubscribeAllNodes(nodes, topic, voidTopicHandler)
await waitForHeartbeat()
# Then topics should be removed from mesh and gossipsub
for node in nodes:
for topic in topics:
check topic notin node.topics
check topic notin node.mesh
check topic notin node.gossipsub
asyncTest "GRAFT messages correctly add peers to mesh":
# Given 2 nodes
let
topic = "foobar"
graftMessage = ControlMessage(graft: @[ControlGraft(topicID: topic)])
numberOfNodes = 2
# First part of the hack: Weird dValues so peers are not GRAFTed automatically
dValues = DValues(dLow: some(0), dHigh: some(0), d: some(0), dOut: some(-1))
nodes = generateNodes(
numberOfNodes, gossip = true, verifySignature = false, dValues = some(dValues)
)
.toGossipSub()
n0 = nodes[0]
n1 = nodes[1]
startNodesAndDeferStop(nodes)
# And the nodes are connected
await connectNodesStar(nodes)
# And both subscribe to the topic
subscribeAllNodes(nodes, topic, voidTopicHandler)
await waitForHeartbeat()
# Because of the hack-ish dValues, the peers are added to gossipsub but not GRAFTed to mesh
check:
n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
not n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
not n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
# Stop both nodes in order to prevent GRAFT message to be sent by heartbeat
await n0.stop()
await n1.stop()
# Second part of the hack
# Set values so peers can be GRAFTed
let newDValues =
some(DValues(dLow: some(1), dHigh: some(1), d: some(1), dOut: some(1)))
n0.parameters.applyDValues(newDValues)
n1.parameters.applyDValues(newDValues)
# When a GRAFT message is sent
let p0 = n1.getOrCreatePeer(n0.peerInfo.peerId, @[GossipSubCodec_12])
let p1 = n0.getOrCreatePeer(n1.peerInfo.peerId, @[GossipSubCodec_12])
n0.broadcast(@[p1], RPCMsg(control: some(graftMessage)), isHighPriority = false)
n1.broadcast(@[p0], RPCMsg(control: some(graftMessage)), isHighPriority = false)
await waitForPeersInTable(
nodes, topic, newSeqWith(numberOfNodes, 1), PeerTableType.Mesh
)
# Then the peers are GRAFTed
check:
n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
asyncTest "Received GRAFT for non-subscribed topic":
# Given 2 nodes
let
topic = "foo"
graftMessage = ControlMessage(graft: @[ControlGraft(topicID: topic)])
numberOfNodes = 2
nodes = generateNodes(numberOfNodes, gossip = true, verifySignature = false)
.toGossipSub()
n0 = nodes[0]
n1 = nodes[1]
startNodesAndDeferStop(nodes)
# And the nodes are connected
await connectNodesStar(nodes)
# And only node0 subscribes to the topic
nodes[0].subscribe(topic, voidTopicHandler)
await waitForHeartbeat()
check:
n0.topics.hasKey(topic)
not n1.topics.hasKey(topic)
not n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
not n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
not n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
# When a GRAFT message is sent
let p1 = n0.getOrCreatePeer(n1.peerInfo.peerId, @[GossipSubCodec_12])
n0.broadcast(@[p1], RPCMsg(control: some(graftMessage)), isHighPriority = false)
await waitForHeartbeat()
# Then the peer is not GRAFTed
check:
n0.topics.hasKey(topic)
not n1.topics.hasKey(topic)
not n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
not n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
not n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
asyncTest "PRUNE messages correctly removes peers from mesh":
# Given 2 nodes
let
topic = "foo"
backoff = 1
pruneMessage = ControlMessage(
prune: @[ControlPrune(topicID: topic, peers: @[], backoff: uint64(backoff))]
)
numberOfNodes = 2
nodes = generateNodes(numberOfNodes, gossip = true, verifySignature = false)
.toGossipSub()
n0 = nodes[0]
n1 = nodes[1]
startNodesAndDeferStop(nodes)
# And the nodes are connected
await connectNodesStar(nodes)
# And both subscribe to the topic
subscribeAllNodes(nodes, topic, voidTopicHandler)
await waitForHeartbeat()
check:
n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
# When a PRUNE message is sent
let p1 = n0.getOrCreatePeer(n1.peerInfo.peerId, @[GossipSubCodec_12])
n0.broadcast(@[p1], RPCMsg(control: some(pruneMessage)), isHighPriority = false)
await waitForHeartbeat()
# Then the peer is PRUNEd
check:
n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
not n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
# When another PRUNE message is sent
let p0 = n1.getOrCreatePeer(n0.peerInfo.peerId, @[GossipSubCodec_12])
n1.broadcast(@[p0], RPCMsg(control: some(pruneMessage)), isHighPriority = false)
await waitForHeartbeat()
# Then the peer is PRUNEd
check:
n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
not n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
not n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
asyncTest "Received PRUNE for non-subscribed topic":
# Given 2 nodes
let
topic = "foo"
pruneMessage =
ControlMessage(prune: @[ControlPrune(topicID: topic, peers: @[], backoff: 1)])
numberOfNodes = 2
nodes = generateNodes(numberOfNodes, gossip = true, verifySignature = false)
.toGossipSub()
n0 = nodes[0]
n1 = nodes[1]
startNodesAndDeferStop(nodes)
# And the nodes are connected
await connectNodesStar(nodes)
# And only node0 subscribes to the topic
n0.subscribe(topic, voidTopicHandler)
await waitForHeartbeat()
check:
n0.topics.hasKey(topic)
not n1.topics.hasKey(topic)
not n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
not n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
not n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
# When a PRUNE message is sent
let p1 = n0.getOrCreatePeer(n1.peerInfo.peerId, @[GossipSubCodec_12])
n0.broadcast(@[p1], RPCMsg(control: some(pruneMessage)), isHighPriority = false)
await waitForHeartbeat()
# Then the peer is not PRUNEd
check:
n0.topics.hasKey(topic)
not n1.topics.hasKey(topic)
not n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
not n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
not n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)

View File

@@ -16,7 +16,6 @@ import sugar
import chronicles
import ../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable, timedcache]
import ../../libp2p/protocols/pubsub/rpc/[message, protobuf]
import ../../libp2p/muxers/muxer
import ../helpers, ../utils/[futures]
const MsgIdSuccess = "msg id gen success"
@@ -78,38 +77,22 @@ suite "GossipSub Message Handling":
checkTrackers()
asyncTest "Drop messages of topics without subscription":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
check false
let topic = "foobar"
var conns = newSeq[Connection]()
for i in 0 ..< 30:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler
var (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic)
defer:
await teardownGossipSub(gossipSub, conns)
# generate messages
var seqno = 0'u64
for i in 0 .. 5:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
let conn = conns[i]
let peer = peers[i]
inc seqno
let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno))
let msg = Message.init(conn.peerId, ("bar" & $i).toBytes(), topic, some(seqno))
await gossipSub.rpcHandler(peer, encodeRpcMsg(RPCMsg(messages: @[msg]), false))
check gossipSub.mcache.msgs.len == 0
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "subscription limits":
let gossipSub = TestGossipSub.init(newStandardSwitch())
gossipSub.topicsHigh = 10
@@ -859,3 +842,34 @@ suite "GossipSub Message Handling":
publishResult == 0
results[0].isPending()
results[1].isPending()
# check correctly parsed ihave/iwant/graft/prune/idontwant messages
# check value before & after decoding equal using protoc cmd tool for reference
asyncTest "ControlMessage RPCMsg encoding and decoding":
let id: seq[byte] = @[123]
let message = RPCMsg(
control: some(
ControlMessage(
ihave: @[ControlIHave(topicID: "foobar", messageIDs: @[id])],
iwant: @[ControlIWant(messageIDs: @[id])],
graft: @[ControlGraft(topicID: "foobar")],
prune: @[ControlPrune(topicID: "foobar", backoff: 10.uint64)],
idontwant: @[ControlIWant(messageIDs: @[id])],
)
)
)
#data encoded using protoc cmd tool
let expectedEncoded: seq[byte] =
@[
26, 45, 10, 11, 10, 6, 102, 111, 111, 98, 97, 114, 18, 1, 123, 18, 3, 10, 1,
123, 26, 8, 10, 6, 102, 111, 111, 98, 97, 114, 34, 10, 10, 6, 102, 111, 111, 98,
97, 114, 24, 10, 42, 3, 10, 1, 123,
]
let actualEncoded = encodeRpcMsg(message, true)
check:
actualEncoded == expectedEncoded
let actualDecoded = decodeRpcMsg(expectedEncoded).value
check:
actualDecoded == message

View File

@@ -23,24 +23,18 @@ suite "GossipSub Scoring":
checkTrackers()
asyncTest "Disconnect bad peers":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar"
var (gossipSub, conns, peers) =
setupGossipSubWithPeers(30, topic, populateGossipsub = true)
defer:
await teardownGossipSub(gossipSub, conns)
gossipSub.parameters.disconnectBadPeers = true
gossipSub.parameters.appSpecificWeight = 1.0
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
check false
let topic = "foobar"
var conns = newSeq[Connection]()
for i in 0 ..< 30:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.sendConn = conn
peer.handler = handler
for i, peer in peers:
peer.appScore = gossipSub.parameters.graylistThreshold - 1
gossipSub.gossipsub.mgetOrPut(topic, initHashSet[PubSubPeer]()).incl(peer)
let conn = conns[i]
gossipSub.switch.connManager.storeMuxer(Muxer(connection: conn))
gossipSub.updateScores()
@@ -53,9 +47,6 @@ suite "GossipSub Scoring":
# also ensure we cleanup properly the peersInIP table
gossipSub.peersInIP.len == 0
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "flood publish to all peers with score above threshold, regardless of subscription":
let
numberOfNodes = 3

View File

@@ -5,7 +5,7 @@ const
libp2p_pubsub_anonymize {.booldefine.} = false
import hashes, random, tables, sets, sequtils, sugar
import chronos, stew/[byteutils, results], chronos/ratelimit
import chronos, results, stew/byteutils, chronos/ratelimit
import
../../libp2p/[
builders,
@@ -18,7 +18,7 @@ import
protocols/pubsub/rpc/messages,
protocols/secure/secure,
]
import ../helpers, ../utils/futures
import ../helpers
import chronicles
export builders
@@ -42,6 +42,21 @@ type
dOut*: Option[int]
dLazy*: Option[int]
proc noop*(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
proc voidTopicHandler*(topic: string, data: seq[byte]) {.async.} =
discard
proc voidPeerHandler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
discard
proc randomPeerId*(): PeerId =
try:
PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet()
except CatchableError as exc:
raise newException(Defect, exc.msg)
proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer =
proc getConn(): Future[Connection] {.
async: (raises: [CancelledError, GetConnDialError])
@@ -61,11 +76,57 @@ proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer =
onNewPeer(p, pubSubPeer)
pubSubPeer
proc randomPeerId*(): PeerId =
try:
PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet()
except CatchableError as exc:
raise newException(Defect, exc.msg)
proc setupGossipSubWithPeers*(
numPeers: int,
topics: seq[string],
populateGossipsub: bool = false,
populateMesh: bool = false,
populateFanout: bool = false,
): (TestGossipSub, seq[Connection], seq[PubSubPeer]) =
let gossipSub = TestGossipSub.init(newStandardSwitch())
for topic in topics:
gossipSub.topicParams[topic] = TopicParams.init()
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
var conns = newSeq[Connection]()
var peers = newSeq[PubSubPeer]()
for i in 0 ..< numPeers:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.sendConn = conn
peer.handler = voidPeerHandler
peers &= peer
for topic in topics:
if (populateGossipsub):
gossipSub.gossipsub[topic].incl(peer)
if (populateMesh):
gossipSub.grafted(peer, topic)
gossipSub.mesh[topic].incl(peer)
if (populateFanout):
gossipSub.fanout[topic].incl(peer)
return (gossipSub, conns, peers)
proc setupGossipSubWithPeers*(
numPeers: int,
topic: string,
populateGossipsub: bool = false,
populateMesh: bool = false,
populateFanout: bool = false,
): (TestGossipSub, seq[Connection], seq[PubSubPeer]) =
return setupGossipSubWithPeers(
numPeers, @[topic], populateGossipsub, populateMesh, populateFanout
)
proc teardownGossipSub*(gossipSub: TestGossipSub, conns: seq[Connection]) {.async.} =
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
func defaultMsgIdProvider*(m: Message): Result[MessageId, ValidationResult] =
let mid =
@@ -78,7 +139,7 @@ func defaultMsgIdProvider*(m: Message): Result[MessageId, ValidationResult] =
$m.data.hash & $m.topic.hash
ok mid.toBytes()
proc applyDValues(parameters: var GossipSubParams, dValues: Option[DValues]) =
proc applyDValues*(parameters: var GossipSubParams, dValues: Option[DValues]) =
if dValues.isNone:
return
let values = dValues.get
@@ -168,18 +229,21 @@ proc generateNodes*(
switch.mount(pubsub)
result.add(pubsub)
proc connectNodes*(dialer: PubSub, target: PubSub) {.async.} =
proc toGossipSub*(nodes: seq[PubSub]): seq[GossipSub] =
return nodes.mapIt(GossipSub(it))
proc connectNodes*[T: PubSub](dialer: T, target: T) {.async.} =
doAssert dialer.switch.peerInfo.peerId != target.switch.peerInfo.peerId,
"Could not connect same peer"
await dialer.switch.connect(target.peerInfo.peerId, target.peerInfo.addrs)
proc connectNodesStar*(nodes: seq[PubSub]) {.async.} =
proc connectNodesStar*[T: PubSub](nodes: seq[T]) {.async.} =
for dialer in nodes:
for node in nodes:
if dialer.switch.peerInfo.peerId != node.switch.peerInfo.peerId:
await connectNodes(dialer, node)
proc connectNodesSparse*(nodes: seq[PubSub], degree: int = 2) {.async.} =
proc connectNodesSparse*[T: PubSub](nodes: seq[T], degree: int = 2) {.async.} =
if nodes.len < degree:
raise
(ref CatchableError)(msg: "nodes count needs to be greater or equal to degree!")
@@ -324,23 +388,31 @@ proc waitForPeersInTable*(
)
allSatisfied = checkState(nodes, topic, peerCounts, table, satisfied)
proc startNodes*(nodes: seq[PubSub]) {.async.} =
proc startNodes*[T: PubSub](nodes: seq[T]) {.async.} =
await allFuturesThrowing(nodes.mapIt(it.switch.start()))
proc stopNodes*(nodes: seq[PubSub]) {.async.} =
proc stopNodes*[T: PubSub](nodes: seq[T]) {.async.} =
await allFuturesThrowing(nodes.mapIt(it.switch.stop()))
template startNodesAndDeferStop*(nodes: seq[PubSub]): untyped =
template startNodesAndDeferStop*[T: PubSub](nodes: seq[T]): untyped =
await startNodes(nodes)
defer:
await stopNodes(nodes)
proc subscribeAllNodes*(nodes: seq[PubSub], topic: string, topicHandler: TopicHandler) =
proc subscribeAllNodes*[T: PubSub](
nodes: seq[T], topic: string, topicHandler: TopicHandler
) =
for node in nodes:
node.subscribe(topic, topicHandler)
proc subscribeAllNodes*(
nodes: seq[PubSub], topic: string, topicHandlers: seq[TopicHandler]
proc unsubscribeAllNodes*[T: PubSub](
nodes: seq[T], topic: string, topicHandler: TopicHandler
) =
for node in nodes:
node.unsubscribe(topic, topicHandler)
proc subscribeAllNodes*[T: PubSub](
nodes: seq[T], topic: string, topicHandlers: seq[TopicHandler]
) =
if nodes.len != topicHandlers.len:
raise (ref CatchableError)(msg: "nodes and topicHandlers count needs to match!")
@@ -360,12 +432,6 @@ template tryPublish*(
doAssert pubs >= require, "Failed to publish!"
proc noop*(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
proc voidTopicHandler*(topic: string, data: seq[byte]) {.async.} =
discard
proc createCompleteHandler*(): (
Future[bool], proc(topic: string, data: seq[byte]) {.async.}
) =

View File

@@ -11,8 +11,9 @@
{.push raises: [].}
import std/net
import tables
import chronos, stew/[byteutils, endians2, shims/net]
import chronos, stew/[byteutils, endians2]
import
../../libp2p/[
stream/connection,
@@ -62,7 +63,8 @@ proc start*(self: TorServerStub, address: TransportAddress) {.async.} =
var ip: array[4, byte]
for i, e in msg[0 ..^ 3]:
ip[i] = e
$(ipv4(ip)) & ":" & $(Port(fromBytesBE(uint16, msg[^2 ..^ 1])))
$(IpAddress(family: IPv4, address_v4: ip)) & ":" &
$(Port(fromBytesBE(uint16, msg[^2 ..^ 1])))
of Socks5AddressType.IPv6.byte:
let n = 16 + 2 # +2 bytes for the port
msg = newSeq[byte](n) # +2 bytes for the port
@@ -70,7 +72,8 @@ proc start*(self: TorServerStub, address: TransportAddress) {.async.} =
var ip: array[16, byte]
for i, e in msg[0 ..^ 3]:
ip[i] = e
$(ipv6(ip)) & ":" & $(Port(fromBytesBE(uint16, msg[^2 ..^ 1])))
$(IpAddress(family: IPv6, address_v6: ip)) & ":" &
$(Port(fromBytesBE(uint16, msg[^2 ..^ 1])))
of Socks5AddressType.FQDN.byte:
await connSrc.readExactly(addr msg[0], 1)
let n = int(uint8.fromBytes(msg[0 .. 0])) + 2 # +2 bytes for the port

View File

@@ -10,7 +10,7 @@
# those terms.
import chronos, stew/byteutils
import ../libp2p/stream/bufferstream, ../libp2p/stream/lpstream, ../libp2p/errors
import ../libp2p/stream/bufferstream, ../libp2p/stream/lpstream
import ./helpers

View File

@@ -10,11 +10,9 @@
# those terms.
import std/[sequtils, tables]
import stew/results
import results
import chronos
import
../libp2p/
[connmanager, stream/connection, crypto/crypto, muxers/muxer, peerinfo, errors]
import ../libp2p/[connmanager, stream/connection, crypto/crypto, muxers/muxer, peerinfo]
import helpers

View File

@@ -18,7 +18,7 @@ import
discovery/discoverymngr,
discovery/rendezvousinterface,
]
import ./helpers, ./utils/[futures, async_tests]
import ./helpers, ./utils/async_tests
proc createSwitch(rdv: RendezVous = RendezVous.new()): Switch =
SwitchBuilder

View File

@@ -1,6 +1,6 @@
import helpers, commoninterop
import ../libp2p
import ../libp2p/crypto/crypto, ../libp2p/protocols/connectivity/relay/[relay, client]
import ../libp2p/crypto/crypto, ../libp2p/protocols/connectivity/relay/relay
proc switchMplexCreator(
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),

View File

@@ -12,7 +12,7 @@
import sequtils, strutils
import chronos
import ../libp2p/[protocols/rendezvous, switch, builders]
import ../libp2p/discovery/[rendezvousinterface, discoverymngr]
import ../libp2p/discovery/discoverymngr
import ./helpers
proc createSwitch(rdv: RendezVous = RendezVous.new()): Switch =

View File

@@ -9,8 +9,7 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
import std/[options, sequtils]
import stew/[byteutils]
import std/options
import chronos, metrics
import unittest2
import ../libp2p/[builders, switch]

View File

@@ -13,6 +13,6 @@ COPY . nim-libp2p/
RUN \
cd nim-libp2p && \
nim c --skipProjCfg --skipParentCfg --NimblePath:./nimbledeps/pkgs -p:nim-libp2p -d:chronicles_log_level=WARN -d:chronicles_default_output_device=stderr --threads:off ./tests/transport-interop/main.nim
nim c --skipProjCfg --skipParentCfg --NimblePath:./nimbledeps/pkgs -p:nim-libp2p -d:libp2p_quic_support -d:chronicles_log_level=WARN -d:chronicles_default_output_device=stderr --threads:off ./tests/transport-interop/main.nim
ENTRYPOINT ["/app/nim-libp2p/tests/transport-interop/main"]

View File

@@ -99,7 +99,18 @@ proc main() {.async.} =
pingRTTMilllis: float(pingDelay.milliseconds),
)
)
quit(0)
discard waitFor(main().withTimeout(testTimeout))
quit(1)
try:
proc mainAsync(): Future[string] {.async.} =
# mainAsync wraps main and returns some value, as otherwise
# 'waitFor(fut)' has no type (or is ambiguous)
await main()
return "done"
discard waitFor(mainAsync().wait(testTimeout))
except AsyncTimeoutError:
error "Program execution timed out."
quit(-1)
except CatchableError as e:
error "Unexpected error", description = e.msg
quit(-1)

View File

@@ -1,6 +1,8 @@
{.used.}
import unittest2
import times, base64
import times
import ../../../libp2p/transports/tls/certificate
import ../../../libp2p/transports/tls/certificate_ffi
import ../../../libp2p/crypto/crypto

View File

@@ -1,4 +1,4 @@
import chronos/futures, stew/results, chronos, sequtils
import chronos/futures, results, chronos, sequtils
const
DURATION_TIMEOUT* = 1.seconds