mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 17:18:27 -05:00
Compare commits
20 Commits
test-non-m
...
test-branc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fd4100e19c | ||
|
|
cd60b254a0 | ||
|
|
b88cdcdd4b | ||
|
|
4a5e06cb45 | ||
|
|
fff3a7ad1f | ||
|
|
05c894d487 | ||
|
|
8850e9ccd9 | ||
|
|
2746531851 | ||
|
|
2856db5490 | ||
|
|
b29e78ccae | ||
|
|
c9761c3588 | ||
|
|
e4ef21e07c | ||
|
|
61429aa0d6 | ||
|
|
c1ef011556 | ||
|
|
cd1424c09f | ||
|
|
878d627f93 | ||
|
|
1d6385ddc5 | ||
|
|
873f730b4e | ||
|
|
1c1547b137 | ||
|
|
9997f3e3d3 |
6
.github/workflows/ci.yml
vendored
6
.github/workflows/ci.yml
vendored
@@ -14,7 +14,7 @@ concurrency:
|
||||
|
||||
jobs:
|
||||
test:
|
||||
timeout-minutes: 90
|
||||
timeout-minutes: 40
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
@@ -36,6 +36,8 @@ jobs:
|
||||
memory_management: refc
|
||||
- ref: version-2-0
|
||||
memory_management: refc
|
||||
- ref: version-2-2
|
||||
memory_management: refc
|
||||
include:
|
||||
- platform:
|
||||
os: linux
|
||||
@@ -116,5 +118,5 @@ jobs:
|
||||
nimble --version
|
||||
gcc --version
|
||||
|
||||
NIMFLAGS="${NIMFLAGS} --mm:${{ matrix.nim.memory_management }}"
|
||||
export NIMFLAGS="${NIMFLAGS} -d:libp2p_quic_support --mm:${{ matrix.nim.memory_management }}"
|
||||
nimble test
|
||||
|
||||
2
.github/workflows/coverage.yml
vendored
2
.github/workflows/coverage.yml
vendored
@@ -51,7 +51,7 @@ jobs:
|
||||
|
||||
- name: Run test suite with coverage flags
|
||||
run: |
|
||||
export NIMFLAGS="--lineDir:on --passC:-fprofile-arcs --passC:-ftest-coverage --passL:-fprofile-arcs --passL:-ftest-coverage"
|
||||
export NIMFLAGS="-d:libp2p_quic_support --lineDir:on --passC:-fprofile-arcs --passC:-ftest-coverage --passL:-fprofile-arcs --passL:-ftest-coverage"
|
||||
nimble testnative
|
||||
nimble testpubsub
|
||||
nimble testfilter
|
||||
|
||||
4
.github/workflows/daily_common.yml
vendored
4
.github/workflows/daily_common.yml
vendored
@@ -36,7 +36,7 @@ jobs:
|
||||
|
||||
test:
|
||||
needs: delete_cache
|
||||
timeout-minutes: 90
|
||||
timeout-minutes: 40
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
@@ -97,5 +97,5 @@ jobs:
|
||||
dependency_solver="legacy"
|
||||
fi
|
||||
|
||||
NIMFLAGS="${NIMFLAGS} --mm:${{ matrix.nim.memory_management }} --solver:${dependency_solver}"
|
||||
export NIMFLAGS="${NIMFLAGS} -d:libp2p_quic_support --mm:${{ matrix.nim.memory_management }} --solver:${dependency_solver}"
|
||||
nimble test
|
||||
|
||||
26
.pinned
26
.pinned
@@ -1,19 +1,19 @@
|
||||
bearssl;https://github.com/status-im/nim-bearssl@#667b40440a53a58e9f922e29e20818720c62d9ac
|
||||
bearssl;https://github.com/status-im/nim-bearssl@#34d712933a4e0f91f5e66bc848594a581504a215
|
||||
chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a
|
||||
chronos;https://github.com/status-im/nim-chronos@#c04576d829b8a0a1b12baaa8bc92037501b3a4a0
|
||||
dnsclient;https://github.com/ba0f3/dnsclient.nim@#23214235d4784d24aceed99bbfe153379ea557c8
|
||||
faststreams;https://github.com/status-im/nim-faststreams@#720fc5e5c8e428d9d0af618e1e27c44b42350309
|
||||
httputils;https://github.com/status-im/nim-http-utils@#3b491a40c60aad9e8d3407443f46f62511e63b18
|
||||
json_serialization;https://github.com/status-im/nim-json-serialization@#85b7ea093cb85ee4f433a617b97571bd709d30df
|
||||
faststreams;https://github.com/status-im/nim-faststreams@#c51315d0ae5eb2594d0bf41181d0e1aca1b3c01d
|
||||
httputils;https://github.com/status-im/nim-http-utils@#79cbab1460f4c0cdde2084589d017c43a3d7b4f1
|
||||
json_serialization;https://github.com/status-im/nim-json-serialization@#2b1c5eb11df3647a2cee107cd4cce3593cbb8bcf
|
||||
metrics;https://github.com/status-im/nim-metrics@#6142e433fc8ea9b73379770a788017ac528d46ff
|
||||
ngtcp2;https://github.com/status-im/nim-ngtcp2@#9456daa178c655bccd4a3c78ad3b8cce1f0add73
|
||||
nimcrypto;https://github.com/cheatfate/nimcrypto@#1c8d6e3caf3abc572136ae9a1da81730c4eb4288
|
||||
quic;https://github.com/status-im/nim-quic.git@#d54e8f0f2e454604b767fadeae243d95c30c383f
|
||||
results;https://github.com/arnetheduck/nim-results@#f3c666a272c69d70cb41e7245e7f6844797303ad
|
||||
secp256k1;https://github.com/status-im/nim-secp256k1@#7246d91c667f4cc3759fdd50339caa45a2ecd8be
|
||||
serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe54049950e352bb969aab97173b35
|
||||
stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e
|
||||
testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34
|
||||
unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c
|
||||
nimcrypto;https://github.com/cheatfate/nimcrypto@#19c41d6be4c00b4a2c8000583bd30cf8ceb5f4b1
|
||||
quic;https://github.com/status-im/nim-quic.git@#a6c30263c95fc5ddb2ef4d197c09b282555c06b0
|
||||
results;https://github.com/arnetheduck/nim-results@#df8113dda4c2d74d460a8fa98252b0b771bf1f27
|
||||
secp256k1;https://github.com/status-im/nim-secp256k1@#f808ed5e7a7bfc42204ec7830f14b7a42b63c284
|
||||
serialization;https://github.com/status-im/nim-serialization@#548d0adc9797a10b2db7f788b804330306293088
|
||||
stew;https://github.com/status-im/nim-stew@#0db179256cf98eb9ce9ee7b9bc939f219e621f77
|
||||
testutils;https://github.com/status-im/nim-testutils@#9e842bd58420d23044bc55e16088e8abbe93ce51
|
||||
unittest2;https://github.com/status-im/nim-unittest2@#8b51e99b4a57fcfb31689230e75595f024543024
|
||||
websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982
|
||||
zlib;https://github.com/status-im/nim-zlib@#38b72eda9d70067df4a953f56b5ed59630f2a17b
|
||||
zlib;https://github.com/status-im/nim-zlib@#daa8723fd32299d4ca621c837430c29a5a11e19a
|
||||
|
||||
21
README.md
21
README.md
@@ -20,7 +20,7 @@
|
||||
- [Background](#background)
|
||||
- [Install](#install)
|
||||
- [Getting Started](#getting-started)
|
||||
- [Go-libp2p-daemon](#go-libp2p-daemon)
|
||||
- [Testing](#testing)
|
||||
- [Modules](#modules)
|
||||
- [Users](#users)
|
||||
- [Stability](#stability)
|
||||
@@ -33,22 +33,22 @@
|
||||
## Background
|
||||
libp2p is a [Peer-to-Peer](https://en.wikipedia.org/wiki/Peer-to-peer) networking stack, with [implementations](https://github.com/libp2p/libp2p#implementations) in multiple languages derived from the same [specifications.](https://github.com/libp2p/specs)
|
||||
|
||||
Building large scale peer-to-peer systems has been complex and difficult in the last 15 years and libp2p is a way to fix that. It's striving to be a modular stack, with sane and secure defaults, useful protocols, while remain open and extensible.
|
||||
This implementation in native Nim, relying on [chronos](https://github.com/status-im/nim-chronos) for async. It's used in production by a few [projects](#users)
|
||||
Building large scale peer-to-peer systems has been complex and difficult in the last 15 years and libp2p is a way to fix that. It strives to be a modular stack with secure defaults and useful protocols, while remaining open and extensible.
|
||||
This is a native Nim implementation, using [chronos](https://github.com/status-im/nim-chronos) for asynchronous execution. It's used in production by a few [projects](#users)
|
||||
|
||||
Learn more about libp2p at [**libp2p.io**](https://libp2p.io) and follow libp2p's documentation [**docs.libp2p.io**](https://docs.libp2p.io).
|
||||
|
||||
## Install
|
||||
**Prerequisite**
|
||||
- [Nim](https://nim-lang.org/install.html)
|
||||
> The currently supported Nim version is 1.6.18.
|
||||
> The currently supported Nim versions are 1.6, 2.0 and 2.2.
|
||||
|
||||
```
|
||||
nimble install libp2p
|
||||
```
|
||||
|
||||
## Getting Started
|
||||
You'll find the nim-libp2p documentation [here](https://vacp2p.github.io/nim-libp2p/docs/).
|
||||
You'll find the nim-libp2p documentation [here](https://vacp2p.github.io/nim-libp2p/docs/). See [examples](./examples) for simple usage patterns.
|
||||
|
||||
### Testing
|
||||
Remember you'll need to build the `go-libp2p-daemon` binary to run the `nim-libp2p` tests.
|
||||
@@ -80,10 +80,10 @@ List of packages modules implemented in nim-libp2p:
|
||||
| [libp2p-yamux](libp2p/muxers/yamux/yamux.nim) | [Yamux](https://docs.libp2p.io/concepts/multiplex/yamux/) multiplexer |
|
||||
| **Data Types** | |
|
||||
| [peer-id](libp2p/peerid.nim) | [Cryptographic identifiers](https://docs.libp2p.io/concepts/fundamentals/peers/#peer-id) |
|
||||
| [peer-store](libp2p/peerstore.nim) | ["Address book" of known peers](https://docs.libp2p.io/concepts/fundamentals/peers/#peer-store) |
|
||||
| [peer-store](libp2p/peerstore.nim) | [Address book of known peers](https://docs.libp2p.io/concepts/fundamentals/peers/#peer-store) |
|
||||
| [multiaddress](libp2p/multiaddress.nim) | [Composable network addresses](https://github.com/multiformats/multiaddr) |
|
||||
| [signed envelope](libp2p/signed_envelope.nim) | [Signed generic data container](https://github.com/libp2p/specs/blob/master/RFC/0002-signed-envelopes.md) |
|
||||
| [routing record](libp2p/routing_record.nim) | [Signed peer dialing informations](https://github.com/libp2p/specs/blob/master/RFC/0003-routing-records.md) |
|
||||
| [signed-envelope](libp2p/signed_envelope.nim) | [Signed generic data container](https://github.com/libp2p/specs/blob/master/RFC/0002-signed-envelopes.md) |
|
||||
| [routing-record](libp2p/routing_record.nim) | [Signed peer dialing informations](https://github.com/libp2p/specs/blob/master/RFC/0003-routing-records.md) |
|
||||
| [discovery manager](libp2p/discovery/discoverymngr.nim) | Discovery Manager |
|
||||
| **Utilities** | |
|
||||
| [libp2p-crypto](libp2p/crypto) | Cryptographic backend |
|
||||
@@ -156,6 +156,11 @@ The code follows the [Status Nim Style Guide](https://status-im.github.io/nim-st
|
||||
|
||||
### Compile time flags
|
||||
|
||||
Enable quic transport support
|
||||
```bash
|
||||
nim c -d:libp2p_quic_support some_file.nim
|
||||
```
|
||||
|
||||
Enable expensive metrics (ie, metrics with per-peer cardinality):
|
||||
```bash
|
||||
nim c -d:libp2p_expensive_metrics some_file.nim
|
||||
|
||||
@@ -52,7 +52,6 @@ else:
|
||||
stream/connection,
|
||||
transports/transport,
|
||||
transports/tcptransport,
|
||||
transports/quictransport,
|
||||
protocols/secure/noise,
|
||||
cid,
|
||||
multihash,
|
||||
@@ -71,3 +70,7 @@ else:
|
||||
minprotobuf, switch, peerid, peerinfo, connection, multiaddress, crypto, lpstream,
|
||||
bufferstream, muxer, mplex, transport, tcptransport, noise, errors, cid, multihash,
|
||||
multicodec, builders, pubsub
|
||||
|
||||
when defined(libp2p_quic_support):
|
||||
import libp2p/transports/quictransport
|
||||
export quictransport
|
||||
|
||||
@@ -1,17 +1,18 @@
|
||||
mode = ScriptMode.Verbose
|
||||
|
||||
packageName = "libp2p"
|
||||
version = "1.10.0"
|
||||
version = "1.10.1"
|
||||
author = "Status Research & Development GmbH"
|
||||
description = "LibP2P implementation"
|
||||
license = "MIT"
|
||||
skipDirs = @["tests", "examples", "Nim", "tools", "scripts", "docs"]
|
||||
# test
|
||||
|
||||
requires "nim >= 1.6.0",
|
||||
"nimcrypto >= 0.6.0 & < 0.7.0", "dnsclient >= 0.3.0 & < 0.4.0", "bearssl >= 0.2.5",
|
||||
"chronicles >= 0.10.2", "chronos >= 4.0.3", "metrics", "secp256k1", "stew#head",
|
||||
"chronicles >= 0.10.2", "chronos >= 4.0.3", "metrics", "secp256k1", "stew >= 0.4.0",
|
||||
"websock", "unittest2", "results",
|
||||
"https://github.com/status-im/nim-quic.git#d54e8f0f2e454604b767fadeae243d95c30c383f"
|
||||
"https://github.com/status-im/nim-quic.git#a6c30263c95fc5ddb2ef4d197c09b282555c06b0"
|
||||
|
||||
let nimc = getEnv("NIMC", "nim") # Which nim compiler to use
|
||||
let lang = getEnv("NIMLANG", "c") # Which backend (c/cpp/js)
|
||||
|
||||
@@ -23,7 +23,7 @@ import
|
||||
stream/connection,
|
||||
multiaddress,
|
||||
crypto/crypto,
|
||||
transports/[transport, tcptransport, quictransport, memorytransport],
|
||||
transports/[transport, tcptransport, memorytransport],
|
||||
muxers/[muxer, mplex/mplex, yamux/yamux],
|
||||
protocols/[identify, secure/secure, secure/noise, rendezvous],
|
||||
protocols/connectivity/[autonat/server, relay/relay, relay/client, relay/rtransport],
|
||||
@@ -169,11 +169,14 @@ proc withTcpTransport*(
|
||||
TcpTransport.new(flags, upgr)
|
||||
)
|
||||
|
||||
proc withQuicTransport*(b: SwitchBuilder): SwitchBuilder {.public.} =
|
||||
b.withTransport(
|
||||
proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
|
||||
QuicTransport.new(upgr, privateKey)
|
||||
)
|
||||
when defined(libp2p_quic_support):
|
||||
import transports/quictransport
|
||||
|
||||
proc withQuicTransport*(b: SwitchBuilder): SwitchBuilder {.public.} =
|
||||
b.withTransport(
|
||||
proc(upgr: Upgrade, privateKey: PrivateKey): Transport =
|
||||
QuicTransport.new(upgr, privateKey)
|
||||
)
|
||||
|
||||
proc withMemoryTransport*(b: SwitchBuilder): SwitchBuilder {.public.} =
|
||||
b.withTransport(
|
||||
|
||||
@@ -124,9 +124,13 @@ proc expandDnsAddr(
|
||||
for resolvedAddress in resolved:
|
||||
let lastPart = resolvedAddress[^1].tryGet()
|
||||
if lastPart.protoCode == Result[MultiCodec, string].ok(multiCodec("p2p")):
|
||||
let
|
||||
var peerIdBytes: seq[byte]
|
||||
try:
|
||||
peerIdBytes = lastPart.protoArgument().tryGet()
|
||||
addrPeerId = PeerId.init(peerIdBytes).tryGet()
|
||||
except ResultError[string]:
|
||||
raiseAssert "expandDnsAddr failed in protoArgument: " & getCurrentExceptionMsg()
|
||||
|
||||
let addrPeerId = PeerId.init(peerIdBytes).tryGet()
|
||||
result.add((resolvedAddress[0 ..^ 2].tryGet(), Opt.some(addrPeerId)))
|
||||
else:
|
||||
result.add((resolvedAddress, peerId))
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import stew/results
|
||||
import results
|
||||
import chronos, chronicles
|
||||
import ../../../switch, ../../../multiaddress, ../../../peerid
|
||||
import core
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
|
||||
import std/sequtils
|
||||
|
||||
import stew/results
|
||||
import results
|
||||
import chronos, chronicles
|
||||
|
||||
import core
|
||||
|
||||
@@ -10,8 +10,8 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[sets, sequtils]
|
||||
import stew/[results, objects]
|
||||
import chronos, chronicles
|
||||
import stew/objects
|
||||
import results, chronos, chronicles
|
||||
|
||||
import core
|
||||
import
|
||||
|
||||
@@ -29,7 +29,7 @@ import
|
||||
../../utility,
|
||||
../../switch
|
||||
|
||||
import stew/results
|
||||
import results
|
||||
export results
|
||||
|
||||
import ./gossipsub/[types, scoring, behavior], ../../utils/heartbeat
|
||||
|
||||
@@ -31,7 +31,7 @@ import
|
||||
../../errors,
|
||||
../../utility
|
||||
|
||||
import stew/results
|
||||
import results
|
||||
export results
|
||||
|
||||
export tables, sets
|
||||
|
||||
@@ -9,8 +9,8 @@
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[sequtils, strutils, tables, hashes, options, sets, deques]
|
||||
import stew/results
|
||||
import std/[sequtils, tables, hashes, options, sets, deques]
|
||||
import results
|
||||
import chronos, chronicles, nimcrypto/sha2, metrics
|
||||
import chronos/ratelimit
|
||||
import
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[hashes, sets]
|
||||
import chronos/timer, stew/results
|
||||
import chronos/timer, results
|
||||
|
||||
import ../../utility
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[sequtils]
|
||||
import stew/results
|
||||
import results
|
||||
import chronos, chronicles
|
||||
import
|
||||
transport,
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import options, tables
|
||||
import chronos, chronicles, stew/byteutils
|
||||
import helpers
|
||||
import ../libp2p
|
||||
|
||||
@@ -7,7 +7,7 @@ COPY .pinned libp2p.nimble nim-libp2p/
|
||||
|
||||
RUN --mount=type=cache,target=/var/cache/apt apt-get update && apt-get install -y libssl-dev
|
||||
|
||||
RUN cd nim-libp2p && nimble install_pinned && nimble install redis -y
|
||||
RUN cd nim-libp2p && nimble install_pinned && nimble install "redis@#b341fe240dbf11c544011dd0e033d3c3acca56af" -y
|
||||
|
||||
COPY . nim-libp2p/
|
||||
|
||||
|
||||
@@ -18,6 +18,9 @@ import
|
||||
import ../stubs/autonatclientstub
|
||||
import ../errorhelpers
|
||||
|
||||
logScope:
|
||||
topics = "hp interop node"
|
||||
|
||||
proc createSwitch(r: Relay = nil, hpService: Service = nil): Switch =
|
||||
let rng = newRng()
|
||||
var builder = SwitchBuilder
|
||||
@@ -41,86 +44,96 @@ proc createSwitch(r: Relay = nil, hpService: Service = nil): Switch =
|
||||
return s
|
||||
|
||||
proc main() {.async.} =
|
||||
let relayClient = RelayClient.new()
|
||||
let autoRelayService = AutoRelayService.new(1, relayClient, nil, newRng())
|
||||
let autonatClientStub = AutonatClientStub.new(expectedDials = 1)
|
||||
autonatClientStub.answer = NotReachable
|
||||
let autonatService = AutonatService.new(autonatClientStub, newRng(), maxQueueSize = 1)
|
||||
let hpservice = HPService.new(autonatService, autoRelayService)
|
||||
|
||||
let
|
||||
isListener = getEnv("MODE") == "listen"
|
||||
switch = createSwitch(relayClient, hpservice)
|
||||
auxSwitch = createSwitch()
|
||||
redisClient = open("redis", 6379.Port)
|
||||
|
||||
debug "Connected to redis"
|
||||
|
||||
await switch.start()
|
||||
await auxSwitch.start()
|
||||
|
||||
let relayAddr =
|
||||
try:
|
||||
redisClient.bLPop(@["RELAY_TCP_ADDRESS"], 0)
|
||||
except Exception as e:
|
||||
raise newException(CatchableError, e.msg)
|
||||
|
||||
debug "All relay addresses", relayAddr
|
||||
|
||||
# This is necessary to make the autonat service work. It will ask this peer for our reachability which the autonat
|
||||
# client stub will answer NotReachable.
|
||||
await switch.connect(auxSwitch.peerInfo.peerId, auxSwitch.peerInfo.addrs)
|
||||
|
||||
# Wait for autonat to be NotReachable
|
||||
while autonatService.networkReachability != NetworkReachability.NotReachable:
|
||||
await sleepAsync(100.milliseconds)
|
||||
|
||||
# This will trigger the autonat relay service to make a reservation.
|
||||
let relayMA = MultiAddress.init(relayAddr[1]).tryGet()
|
||||
|
||||
try:
|
||||
let relayClient = RelayClient.new()
|
||||
let autoRelayService = AutoRelayService.new(1, relayClient, nil, newRng())
|
||||
let autonatClientStub = AutonatClientStub.new(expectedDials = 1)
|
||||
autonatClientStub.answer = NotReachable
|
||||
let autonatService =
|
||||
AutonatService.new(autonatClientStub, newRng(), maxQueueSize = 1)
|
||||
let hpservice = HPService.new(autonatService, autoRelayService)
|
||||
debug "Dialing relay...", relayMA
|
||||
let relayId = await switch.connect(relayMA).wait(30.seconds)
|
||||
debug "Connected to relay", relayId
|
||||
except AsyncTimeoutError:
|
||||
raise newException(CatchableError, "Connection to relay timed out")
|
||||
|
||||
let
|
||||
isListener = getEnv("MODE") == "listen"
|
||||
switch = createSwitch(relayClient, hpservice)
|
||||
auxSwitch = createSwitch()
|
||||
redisClient = open("redis", 6379.Port)
|
||||
# Wait for our relay address to be published
|
||||
while not switch.peerInfo.addrs.anyIt(it.contains(multiCodec("p2p-circuit")).tryGet()):
|
||||
await sleepAsync(100.milliseconds)
|
||||
|
||||
debug "Connected to redis"
|
||||
if isListener:
|
||||
let listenerPeerId = switch.peerInfo.peerId
|
||||
discard redisClient.rPush("LISTEN_CLIENT_PEER_ID", $listenerPeerId)
|
||||
debug "Pushed listener client peer id to redis", listenerPeerId
|
||||
|
||||
await switch.start()
|
||||
await auxSwitch.start()
|
||||
|
||||
let relayAddr =
|
||||
# Nothing to do anymore, wait to be killed
|
||||
await sleepAsync(2.minutes)
|
||||
else:
|
||||
let listenerId =
|
||||
try:
|
||||
redisClient.bLPop(@["RELAY_TCP_ADDRESS"], 0)
|
||||
PeerId.init(redisClient.bLPop(@["LISTEN_CLIENT_PEER_ID"], 0)[1]).tryGet()
|
||||
except Exception as e:
|
||||
raise newException(CatchableError, e.msg)
|
||||
|
||||
# This is necessary to make the autonat service work. It will ask this peer for our reachability which the autonat
|
||||
# client stub will answer NotReachable.
|
||||
await switch.connect(auxSwitch.peerInfo.peerId, auxSwitch.peerInfo.addrs)
|
||||
debug "Got listener peer id", listenerId
|
||||
let listenerRelayAddr = MultiAddress.init($relayMA & "/p2p-circuit").tryGet()
|
||||
|
||||
# Wait for autonat to be NotReachable
|
||||
while autonatService.networkReachability != NetworkReachability.NotReachable:
|
||||
await sleepAsync(100.milliseconds)
|
||||
debug "Dialing listener relay address", listenerRelayAddr
|
||||
await switch.connect(listenerId, @[listenerRelayAddr])
|
||||
|
||||
# This will trigger the autonat relay service to make a reservation.
|
||||
let relayMA = MultiAddress.init(relayAddr[1]).tryGet()
|
||||
debug "Got relay address", relayMA
|
||||
let relayId = await switch.connect(relayMA)
|
||||
debug "Connected to relay", relayId
|
||||
# wait for hole-punching to complete in the background
|
||||
await sleepAsync(5000.milliseconds)
|
||||
|
||||
# Wait for our relay address to be published
|
||||
while not switch.peerInfo.addrs.anyIt(
|
||||
it.contains(multiCodec("p2p-circuit")).tryGet()
|
||||
let conn = switch.connManager.selectMuxer(listenerId).connection
|
||||
let channel = await switch.dial(listenerId, @[listenerRelayAddr], PingCodec)
|
||||
let delay = await Ping.new().ping(channel)
|
||||
await allFuturesThrowing(
|
||||
channel.close(), conn.close(), switch.stop(), auxSwitch.stop()
|
||||
)
|
||||
:
|
||||
await sleepAsync(100.milliseconds)
|
||||
echo &"""{{"rtt_to_holepunched_peer_millis":{delay.millis}}}"""
|
||||
|
||||
if isListener:
|
||||
let listenerPeerId = switch.peerInfo.peerId
|
||||
discard redisClient.rPush("LISTEN_CLIENT_PEER_ID", $listenerPeerId)
|
||||
debug "Pushed listener client peer id to redis", listenerPeerId
|
||||
try:
|
||||
proc mainAsync(): Future[string] {.async.} =
|
||||
# mainAsync wraps main and returns some value, as otherwise
|
||||
# 'waitFor(fut)' has no type (or is ambiguous)
|
||||
await main()
|
||||
return "done"
|
||||
|
||||
# Nothing to do anymore, wait to be killed
|
||||
await sleepAsync(2.minutes)
|
||||
else:
|
||||
let listenerId =
|
||||
try:
|
||||
PeerId.init(redisClient.bLPop(@["LISTEN_CLIENT_PEER_ID"], 0)[1]).tryGet()
|
||||
except Exception as e:
|
||||
raise newException(CatchableError, e.msg)
|
||||
|
||||
debug "Got listener peer id", listenerId
|
||||
let listenerRelayAddr = MultiAddress.init($relayMA & "/p2p-circuit").tryGet()
|
||||
|
||||
debug "Dialing listener relay address", listenerRelayAddr
|
||||
await switch.connect(listenerId, @[listenerRelayAddr])
|
||||
|
||||
# wait for hole-punching to complete in the background
|
||||
await sleepAsync(5000.milliseconds)
|
||||
|
||||
let conn = switch.connManager.selectMuxer(listenerId).connection
|
||||
let channel = await switch.dial(listenerId, @[listenerRelayAddr], PingCodec)
|
||||
let delay = await Ping.new().ping(channel)
|
||||
await allFuturesThrowing(
|
||||
channel.close(), conn.close(), switch.stop(), auxSwitch.stop()
|
||||
)
|
||||
echo &"""{{"rtt_to_holepunched_peer_millis":{delay.millis}}}"""
|
||||
quit(0)
|
||||
except CatchableError as e:
|
||||
error "Unexpected error", description = e.msg
|
||||
|
||||
discard waitFor(main().withTimeout(4.minutes))
|
||||
quit(1)
|
||||
discard waitFor(mainAsync().wait(4.minutes))
|
||||
except AsyncTimeoutError:
|
||||
error "Program execution timed out."
|
||||
quit(-1)
|
||||
except CatchableError as e:
|
||||
error "Unexpected error", description = e.msg
|
||||
quit(-1)
|
||||
|
||||
@@ -9,12 +9,11 @@
|
||||
|
||||
{.used.}
|
||||
|
||||
import sequtils, options, tables, sets
|
||||
import sequtils, tables, sets
|
||||
import chronos, stew/byteutils
|
||||
import
|
||||
utils,
|
||||
../../libp2p/[
|
||||
errors,
|
||||
switch,
|
||||
stream/connection,
|
||||
crypto/crypto,
|
||||
|
||||
@@ -22,88 +22,44 @@ suite "GossipSub Fanout Management":
|
||||
checkTrackers()
|
||||
|
||||
asyncTest "`replenishFanout` Degree Lo":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
|
||||
discard
|
||||
|
||||
let topic = "foobar"
|
||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.topicParams[topic] = TopicParams.init()
|
||||
|
||||
var conns = newSeq[Connection]()
|
||||
for i in 0 ..< 15:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
var peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.handler = handler
|
||||
gossipSub.gossipsub[topic].incl(peer)
|
||||
let (gossipSub, conns, peers) =
|
||||
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
check gossipSub.gossipsub[topic].len == 15
|
||||
gossipSub.replenishFanout(topic)
|
||||
check gossipSub.fanout[topic].len == gossipSub.parameters.d
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "`dropFanoutPeers` drop expired fanout topics":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
|
||||
discard
|
||||
|
||||
let topic = "foobar"
|
||||
gossipSub.topicParams[topic] = TopicParams.init()
|
||||
gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(1.millis)
|
||||
await sleepAsync(5.millis) # allow the topic to expire
|
||||
let (gossipSub, conns, peers) =
|
||||
setupGossipSubWithPeers(6, topic, populateGossipsub = true, populateFanout = true)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
var conns = newSeq[Connection]()
|
||||
for i in 0 ..< 6:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.handler = handler
|
||||
gossipSub.fanout[topic].incl(peer)
|
||||
gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(1.millis)
|
||||
await sleepAsync(5.millis) # allow the topic to expire
|
||||
|
||||
check gossipSub.fanout[topic].len == gossipSub.parameters.d
|
||||
|
||||
gossipSub.dropFanoutPeers()
|
||||
check topic notin gossipSub.fanout
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "`dropFanoutPeers` leave unexpired fanout topics":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
let
|
||||
topic1 = "foobar1"
|
||||
topic2 = "foobar2"
|
||||
let (gossipSub, conns, peers) = setupGossipSubWithPeers(
|
||||
6, @[topic1, topic2], populateGossipsub = true, populateFanout = true
|
||||
)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
|
||||
discard
|
||||
|
||||
let topic1 = "foobar1"
|
||||
let topic2 = "foobar2"
|
||||
gossipSub.topicParams[topic1] = TopicParams.init()
|
||||
gossipSub.topicParams[topic2] = TopicParams.init()
|
||||
gossipSub.fanout[topic1] = initHashSet[PubSubPeer]()
|
||||
gossipSub.fanout[topic2] = initHashSet[PubSubPeer]()
|
||||
gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(1.millis)
|
||||
gossipSub.lastFanoutPubSub[topic2] = Moment.fromNow(1.minutes)
|
||||
await sleepAsync(5.millis) # allow the topic to expire
|
||||
|
||||
var conns = newSeq[Connection]()
|
||||
for i in 0 ..< 6:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.handler = handler
|
||||
gossipSub.fanout[topic1].incl(peer)
|
||||
gossipSub.fanout[topic2].incl(peer)
|
||||
await sleepAsync(5.millis) # allow first topic to expire
|
||||
|
||||
check gossipSub.fanout[topic1].len == gossipSub.parameters.d
|
||||
check gossipSub.fanout[topic2].len == gossipSub.parameters.d
|
||||
@@ -112,14 +68,8 @@ suite "GossipSub Fanout Management":
|
||||
check topic1 notin gossipSub.fanout
|
||||
check topic2 in gossipSub.fanout
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "e2e - GossipSub send over fanout A -> B":
|
||||
var passed = newFuture[void]()
|
||||
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||
check topic == "foobar"
|
||||
passed.complete()
|
||||
let (passed, handler) = createCompleteHandler()
|
||||
|
||||
let nodes = generateNodes(2, gossip = true)
|
||||
|
||||
@@ -153,15 +103,12 @@ suite "GossipSub Fanout Management":
|
||||
gossip1.fanout.hasPeerId("foobar", gossip2.peerInfo.peerId)
|
||||
not gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId)
|
||||
|
||||
await passed.wait(2.seconds)
|
||||
discard await passed.wait(2.seconds)
|
||||
|
||||
check observed == 2
|
||||
|
||||
asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic":
|
||||
var passed = newFuture[void]()
|
||||
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||
check topic == "foobar"
|
||||
passed.complete()
|
||||
let (passed, handler) = createCompleteHandler()
|
||||
|
||||
let nodes = generateNodes(2, gossip = true, unsubscribeBackoff = 10.minutes)
|
||||
|
||||
@@ -191,6 +138,6 @@ suite "GossipSub Fanout Management":
|
||||
GossipSub(nodes[0]).fanout.getOrDefault("foobar").len > 0
|
||||
GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0
|
||||
|
||||
await passed.wait(2.seconds)
|
||||
discard await passed.wait(2.seconds)
|
||||
|
||||
trace "test done, stopping..."
|
||||
|
||||
@@ -24,26 +24,14 @@ suite "GossipSub Gossip Protocol":
|
||||
checkTrackers()
|
||||
|
||||
asyncTest "`getGossipPeers` - should gather up to degree D non intersecting peers":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
|
||||
discard
|
||||
|
||||
let topic = "foobar"
|
||||
gossipSub.topicParams[topic] = TopicParams.init()
|
||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
||||
var conns = newSeq[Connection]()
|
||||
let (gossipSub, conns, peers) = setupGossipSubWithPeers(45, topic)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
# generate mesh and fanout peers
|
||||
# generate mesh and fanout peers
|
||||
for i in 0 ..< 30:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.handler = handler
|
||||
let peer = peers[i]
|
||||
if i mod 2 == 0:
|
||||
gossipSub.fanout[topic].incl(peer)
|
||||
else:
|
||||
@@ -51,57 +39,36 @@ suite "GossipSub Gossip Protocol":
|
||||
gossipSub.mesh[topic].incl(peer)
|
||||
|
||||
# generate gossipsub (free standing) peers
|
||||
for i in 0 ..< 15:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.handler = handler
|
||||
for i in 30 ..< 45:
|
||||
let peer = peers[i]
|
||||
gossipSub.gossipsub[topic].incl(peer)
|
||||
|
||||
# generate messages
|
||||
var seqno = 0'u64
|
||||
for i in 0 .. 5:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let conn = conns[i]
|
||||
inc seqno
|
||||
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
||||
let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
|
||||
|
||||
check gossipSub.fanout[topic].len == 15
|
||||
check gossipSub.mesh[topic].len == 15
|
||||
check gossipSub.gossipsub[topic].len == 15
|
||||
|
||||
let peers = gossipSub.getGossipPeers()
|
||||
check peers.len == gossipSub.parameters.d
|
||||
for p in peers.keys:
|
||||
let gossipPeers = gossipSub.getGossipPeers()
|
||||
check gossipPeers.len == gossipSub.parameters.d
|
||||
for p in gossipPeers.keys:
|
||||
check not gossipSub.fanout.hasPeerId(topic, p.peerId)
|
||||
check not gossipSub.mesh.hasPeerId(topic, p.peerId)
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "`getGossipPeers` - should not crash on missing topics in mesh":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
|
||||
discard
|
||||
|
||||
let topic = "foobar"
|
||||
gossipSub.topicParams[topic] = TopicParams.init()
|
||||
gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
||||
var conns = newSeq[Connection]()
|
||||
for i in 0 ..< 30:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.handler = handler
|
||||
let (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
# generate mesh and fanout peers
|
||||
for i, peer in peers:
|
||||
if i mod 2 == 0:
|
||||
gossipSub.fanout[topic].incl(peer)
|
||||
else:
|
||||
@@ -110,38 +77,22 @@ suite "GossipSub Gossip Protocol":
|
||||
# generate messages
|
||||
var seqno = 0'u64
|
||||
for i in 0 .. 5:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let conn = conns[i]
|
||||
inc seqno
|
||||
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
||||
let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
|
||||
|
||||
let peers = gossipSub.getGossipPeers()
|
||||
check peers.len == gossipSub.parameters.d
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
let gossipPeers = gossipSub.getGossipPeers()
|
||||
check gossipPeers.len == gossipSub.parameters.d
|
||||
|
||||
asyncTest "`getGossipPeers` - should not crash on missing topics in fanout":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
|
||||
discard
|
||||
|
||||
let topic = "foobar"
|
||||
gossipSub.topicParams[topic] = TopicParams.init()
|
||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
||||
var conns = newSeq[Connection]()
|
||||
for i in 0 ..< 30:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.handler = handler
|
||||
let (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
# generate mesh and fanout peers
|
||||
for i, peer in peers:
|
||||
if i mod 2 == 0:
|
||||
gossipSub.mesh[topic].incl(peer)
|
||||
gossipSub.grafted(peer, topic)
|
||||
@@ -151,38 +102,22 @@ suite "GossipSub Gossip Protocol":
|
||||
# generate messages
|
||||
var seqno = 0'u64
|
||||
for i in 0 .. 5:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let conn = conns[i]
|
||||
inc seqno
|
||||
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
||||
let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
|
||||
|
||||
let peers = gossipSub.getGossipPeers()
|
||||
check peers.len == gossipSub.parameters.d
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
let gossipPeers = gossipSub.getGossipPeers()
|
||||
check gossipPeers.len == gossipSub.parameters.d
|
||||
|
||||
asyncTest "`getGossipPeers` - should not crash on missing topics in gossip":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
|
||||
discard
|
||||
|
||||
let topic = "foobar"
|
||||
gossipSub.topicParams[topic] = TopicParams.init()
|
||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
|
||||
var conns = newSeq[Connection]()
|
||||
for i in 0 ..< 30:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.handler = handler
|
||||
let (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
# generate mesh and fanout peers
|
||||
for i, peer in peers:
|
||||
if i mod 2 == 0:
|
||||
gossipSub.mesh[topic].incl(peer)
|
||||
gossipSub.grafted(peer, topic)
|
||||
@@ -192,47 +127,22 @@ suite "GossipSub Gossip Protocol":
|
||||
# generate messages
|
||||
var seqno = 0'u64
|
||||
for i in 0 .. 5:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let conn = conns[i]
|
||||
inc seqno
|
||||
let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno))
|
||||
let msg = Message.init(conn.peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
|
||||
|
||||
let peers = gossipSub.getGossipPeers()
|
||||
check peers.len == 0
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
let gossipPeers = gossipSub.getGossipPeers()
|
||||
check gossipPeers.len == 0
|
||||
|
||||
asyncTest "handleIHave/Iwant tests":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
|
||||
check false
|
||||
|
||||
proc handler2(topic: string, data: seq[byte]) {.async.} =
|
||||
discard
|
||||
|
||||
let topic = "foobar"
|
||||
var conns = newSeq[Connection]()
|
||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.subscribe(topic, handler2)
|
||||
var (gossipSub, conns, peers) =
|
||||
setupGossipSubWithPeers(30, topic, populateMesh = true)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
# Instantiates 30 peers and connects all of them to the previously defined `gossipSub`
|
||||
for i in 0 ..< 30:
|
||||
# Define a new connection
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.handler = handler
|
||||
# Add the connection to `gossipSub`, to their `gossipSub.gossipsub` and `gossipSub.mesh` tables
|
||||
gossipSub.grafted(peer, topic)
|
||||
gossipSub.mesh[topic].incl(peer)
|
||||
gossipSub.subscribe(topic, voidTopicHandler)
|
||||
|
||||
# Peers with no budget should not request messages
|
||||
block:
|
||||
@@ -296,9 +206,6 @@ suite "GossipSub Gossip Protocol":
|
||||
|
||||
check gossipSub.mcache.msgs.len == 1
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "messages sent to peers not in the mesh are propagated via gossip":
|
||||
let
|
||||
numberOfNodes = 5
|
||||
@@ -715,3 +622,130 @@ suite "GossipSub Gossip Protocol":
|
||||
gossip0.mesh.getOrDefault("foobar").toSeq[0].codec == GossipSubCodec_10
|
||||
checkUntilTimeout:
|
||||
gossip1.mesh.getOrDefault("foobar").toSeq[0].codec == GossipSubCodec_10
|
||||
|
||||
asyncTest "IHAVE messages correctly advertise message ID to peers":
|
||||
# Given 2 nodes
|
||||
let
|
||||
topic = "foo"
|
||||
messageID = @[0'u8, 1, 2, 3]
|
||||
ihaveMessage =
|
||||
ControlMessage(ihave: @[ControlIHave(topicID: topic, messageIDs: @[messageID])])
|
||||
numberOfNodes = 2
|
||||
nodes = generateNodes(numberOfNodes, gossip = true, verifySignature = false)
|
||||
.toGossipSub()
|
||||
n0 = nodes[0]
|
||||
n1 = nodes[1]
|
||||
|
||||
startNodesAndDeferStop(nodes)
|
||||
|
||||
# Given node1 has an IHAVE observer
|
||||
var receivedIHave = newFuture[(string, seq[MessageId])]()
|
||||
let checkForIhaves = proc(peer: PubSubPeer, msgs: var RPCMsg) =
|
||||
if msgs.control.isSome:
|
||||
for msg in msgs.control.get.ihave:
|
||||
receivedIHave.complete((msg.topicID, msg.messageIDs))
|
||||
n1.addObserver(PubSubObserver(onRecv: checkForIhaves))
|
||||
|
||||
# And the nodes are connected
|
||||
await connectNodesStar(nodes)
|
||||
|
||||
# And both subscribe to the topic
|
||||
subscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
|
||||
check:
|
||||
n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
|
||||
# When an IHAVE message is sent
|
||||
let p1 = n0.getOrCreatePeer(n1.peerInfo.peerId, @[GossipSubCodec_12])
|
||||
n0.broadcast(@[p1], RPCMsg(control: some(ihaveMessage)), isHighPriority = false)
|
||||
await waitForHeartbeat()
|
||||
|
||||
# Then the peer has the message ID
|
||||
let r = await receivedIHave.waitForState(HEARTBEAT_TIMEOUT)
|
||||
check:
|
||||
r.isCompleted((topic, @[messageID]))
|
||||
|
||||
asyncTest "IWANT messages correctly request messages by their IDs":
|
||||
# Given 2 nodes
|
||||
let
|
||||
topic = "foo"
|
||||
messageID = @[0'u8, 1, 2, 3]
|
||||
iwantMessage = ControlMessage(iwant: @[ControlIWant(messageIDs: @[messageID])])
|
||||
numberOfNodes = 2
|
||||
nodes = generateNodes(numberOfNodes, gossip = true, verifySignature = false)
|
||||
.toGossipSub()
|
||||
n0 = nodes[0]
|
||||
n1 = nodes[1]
|
||||
|
||||
startNodesAndDeferStop(nodes)
|
||||
|
||||
# Given node1 has an IWANT observer
|
||||
var receivedIWant = newFuture[seq[MessageId]]()
|
||||
let checkForIwants = proc(peer: PubSubPeer, msgs: var RPCMsg) =
|
||||
if msgs.control.isSome:
|
||||
for msg in msgs.control.get.iwant:
|
||||
receivedIWant.complete(msg.messageIDs)
|
||||
n1.addObserver(PubSubObserver(onRecv: checkForIwants))
|
||||
|
||||
# And the nodes are connected
|
||||
await connectNodesStar(nodes)
|
||||
|
||||
# And both subscribe to the topic
|
||||
subscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
|
||||
check:
|
||||
n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
|
||||
# When an IWANT message is sent
|
||||
let p1 = n0.getOrCreatePeer(n1.peerInfo.peerId, @[GossipSubCodec_12])
|
||||
n0.broadcast(@[p1], RPCMsg(control: some(iwantMessage)), isHighPriority = false)
|
||||
await waitForHeartbeat()
|
||||
|
||||
# Then the peer has the message ID
|
||||
let r = await receivedIWant.waitForState(HEARTBEAT_TIMEOUT)
|
||||
check:
|
||||
r.isCompleted(@[messageID])
|
||||
|
||||
asyncTest "IHAVE for message not held by peer triggers IWANT response to sender":
|
||||
# Given 2 nodes
|
||||
let
|
||||
topic = "foo"
|
||||
messageID = @[0'u8, 1, 2, 3]
|
||||
ihaveMessage =
|
||||
ControlMessage(ihave: @[ControlIHave(topicID: topic, messageIDs: @[messageID])])
|
||||
numberOfNodes = 2
|
||||
nodes = generateNodes(numberOfNodes, gossip = true, verifySignature = false)
|
||||
.toGossipSub()
|
||||
n0 = nodes[0]
|
||||
n1 = nodes[1]
|
||||
|
||||
startNodesAndDeferStop(nodes)
|
||||
|
||||
# Given node1 has an IWANT observer
|
||||
var receivedIWant = newFuture[seq[MessageId]]()
|
||||
let checkForIwants = proc(peer: PubSubPeer, msgs: var RPCMsg) =
|
||||
if msgs.control.isSome:
|
||||
for msg in msgs.control.get.iwant:
|
||||
receivedIWant.complete(msg.messageIDs)
|
||||
n0.addObserver(PubSubObserver(onRecv: checkForIwants))
|
||||
|
||||
# And the nodes are connected
|
||||
await connectNodesStar(nodes)
|
||||
|
||||
# And both nodes subscribe to the topic
|
||||
subscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
|
||||
# When an IHAVE message is sent from node0
|
||||
let p1 = n0.getOrCreatePeer(n1.peerInfo.peerId, @[GossipSubCodec_12])
|
||||
n0.broadcast(@[p1], RPCMsg(control: some(ihaveMessage)), isHighPriority = false)
|
||||
await waitForHeartbeat()
|
||||
|
||||
# Then node0 should receive an IWANT message from node1 (as node1 doesn't have the message)
|
||||
let iWantResult = await receivedIWant.waitForState(HEARTBEAT_TIMEOUT)
|
||||
check:
|
||||
iWantResult.isCompleted(@[messageID])
|
||||
|
||||
@@ -25,28 +25,14 @@ suite "GossipSub Mesh Management":
|
||||
params.validateParameters().tryGet()
|
||||
|
||||
asyncTest "subscribe/unsubscribeAll":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(topic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} =
|
||||
discard
|
||||
|
||||
let topic = "foobar"
|
||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.topicParams[topic] = TopicParams.init()
|
||||
|
||||
var conns = newSeq[Connection]()
|
||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
||||
for i in 0 ..< 15:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.sendConn = conn
|
||||
gossipSub.gossipsub[topic].incl(peer)
|
||||
let (gossipSub, conns, peers) =
|
||||
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
# test via dynamic dispatch
|
||||
gossipSub.PubSub.subscribe(topic, handler)
|
||||
gossipSub.PubSub.subscribe(topic, voidTopicHandler)
|
||||
|
||||
check:
|
||||
gossipSub.topics.contains(topic)
|
||||
@@ -61,53 +47,27 @@ suite "GossipSub Mesh Management":
|
||||
topic notin gossipSub.mesh # not in mesh
|
||||
topic in gossipSub.gossipsub # but still in gossipsub table (for fanning out)
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "`rebalanceMesh` Degree Lo":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
let topic = "foobar"
|
||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.topicParams[topic] = TopicParams.init()
|
||||
|
||||
var conns = newSeq[Connection]()
|
||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
||||
for i in 0 ..< 15:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.sendConn = conn
|
||||
gossipSub.gossipsub[topic].incl(peer)
|
||||
let (gossipSub, conns, peers) =
|
||||
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
check gossipSub.peers.len == 15
|
||||
gossipSub.rebalanceMesh(topic)
|
||||
check gossipSub.mesh[topic].len == gossipSub.parameters.d
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "rebalanceMesh - bad peers":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
let topic = "foobar"
|
||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.topicParams[topic] = TopicParams.init()
|
||||
let (gossipSub, conns, peers) =
|
||||
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
var conns = newSeq[Connection]()
|
||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
||||
var scoreLow = -11'f64
|
||||
for i in 0 ..< 15:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.sendConn = conn
|
||||
for peer in peers:
|
||||
peer.score = scoreLow
|
||||
gossipSub.gossipsub[topic].incl(peer)
|
||||
scoreLow += 1.0
|
||||
|
||||
check gossipSub.peers.len == 15
|
||||
@@ -117,54 +77,28 @@ suite "GossipSub Mesh Management":
|
||||
for peer in gossipSub.mesh[topic]:
|
||||
check peer.score >= 0.0
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "`rebalanceMesh` Degree Hi":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
let topic = "foobar"
|
||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.topicParams[topic] = TopicParams.init()
|
||||
|
||||
var conns = newSeq[Connection]()
|
||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
||||
for i in 0 ..< 15:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
gossipSub.grafted(peer, topic)
|
||||
gossipSub.mesh[topic].incl(peer)
|
||||
let (gossipSub, conns, peers) =
|
||||
setupGossipSubWithPeers(15, topic, populateGossipsub = true, populateMesh = true)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
check gossipSub.mesh[topic].len == 15
|
||||
gossipSub.rebalanceMesh(topic)
|
||||
check gossipSub.mesh[topic].len ==
|
||||
gossipSub.parameters.d + gossipSub.parameters.dScore
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "rebalanceMesh fail due to backoff":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
let topic = "foobar"
|
||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.topicParams[topic] = TopicParams.init()
|
||||
|
||||
var conns = newSeq[Connection]()
|
||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
||||
for i in 0 ..< 15:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.sendConn = conn
|
||||
gossipSub.gossipsub[topic].incl(peer)
|
||||
let (gossipSub, conns, peers) =
|
||||
setupGossipSubWithPeers(15, topic, populateGossipsub = true)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
for peer in peers:
|
||||
gossipSub.backingOff.mgetOrPut(topic, initTable[PeerId, Moment]()).add(
|
||||
peerId, Moment.now() + 1.hours
|
||||
peer.peerId, Moment.now() + 1.hours
|
||||
)
|
||||
let prunes = gossipSub.handleGraft(peer, @[ControlGraft(topicID: topic)])
|
||||
# there must be a control prune due to violation of backoff
|
||||
@@ -175,34 +109,18 @@ suite "GossipSub Mesh Management":
|
||||
# expect 0 since they are all backing off
|
||||
check gossipSub.mesh[topic].len == 0
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "rebalanceMesh fail due to backoff - remote":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
let topic = "foobar"
|
||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.topicParams[topic] = TopicParams.init()
|
||||
|
||||
var conns = newSeq[Connection]()
|
||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
||||
for i in 0 ..< 15:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.sendConn = conn
|
||||
gossipSub.gossipsub[topic].incl(peer)
|
||||
gossipSub.mesh[topic].incl(peer)
|
||||
let (gossipSub, conns, peers) =
|
||||
setupGossipSubWithPeers(15, topic, populateGossipsub = true, populateMesh = true)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
check gossipSub.peers.len == 15
|
||||
gossipSub.rebalanceMesh(topic)
|
||||
check gossipSub.mesh[topic].len != 0
|
||||
|
||||
for i in 0 ..< 15:
|
||||
let peerId = conns[i].peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
for peer in peers:
|
||||
gossipSub.handlePrune(
|
||||
peer,
|
||||
@[
|
||||
@@ -217,45 +135,36 @@ suite "GossipSub Mesh Management":
|
||||
# expect topic cleaned up since they are all pruned
|
||||
check topic notin gossipSub.mesh
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "rebalanceMesh Degree Hi - audit scenario":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
let topic = "foobar"
|
||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.topicParams[topic] = TopicParams.init()
|
||||
let
|
||||
topic = "foobar"
|
||||
numInPeers = 6
|
||||
numOutPeers = 7
|
||||
totalPeers = numInPeers + numOutPeers
|
||||
|
||||
let (gossipSub, conns, peers) = setupGossipSubWithPeers(
|
||||
totalPeers, topic, populateGossipsub = true, populateMesh = true
|
||||
)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
gossipSub.parameters.dScore = 4
|
||||
gossipSub.parameters.d = 6
|
||||
gossipSub.parameters.dOut = 3
|
||||
gossipSub.parameters.dHigh = 12
|
||||
gossipSub.parameters.dLow = 4
|
||||
|
||||
var conns = newSeq[Connection]()
|
||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
||||
for i in 0 ..< 6:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
for i in 0 ..< numInPeers:
|
||||
let conn = conns[i]
|
||||
let peer = peers[i]
|
||||
conn.transportDir = Direction.In
|
||||
conns &= conn
|
||||
let peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.score = 40.0
|
||||
peer.sendConn = conn
|
||||
gossipSub.grafted(peer, topic)
|
||||
gossipSub.mesh[topic].incl(peer)
|
||||
|
||||
for i in 0 ..< 7:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
for i in numInPeers ..< totalPeers:
|
||||
let conn = conns[i]
|
||||
let peer = peers[i]
|
||||
conn.transportDir = Direction.Out
|
||||
conns &= conn
|
||||
let peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.score = 10.0
|
||||
peer.sendConn = conn
|
||||
gossipSub.grafted(peer, topic)
|
||||
gossipSub.mesh[topic].incl(peer)
|
||||
|
||||
check gossipSub.mesh[topic].len == 13
|
||||
gossipSub.rebalanceMesh(topic)
|
||||
@@ -268,9 +177,6 @@ suite "GossipSub Mesh Management":
|
||||
# ensure we give priority and keep at least dOut outbound peers
|
||||
check outbound >= gossipSub.parameters.dOut
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "dont prune peers if mesh len is less than d_high":
|
||||
let
|
||||
numberOfNodes = 5
|
||||
@@ -520,3 +426,264 @@ suite "GossipSub Mesh Management":
|
||||
nodes[1].subscribe("foobar", handler)
|
||||
|
||||
await invalidDetected.wait(10.seconds)
|
||||
|
||||
asyncTest "mesh and gossipsub updated when topic subscribed and unsubscribed":
|
||||
let
|
||||
numberOfNodes = 5
|
||||
topic = "foobar"
|
||||
nodes = generateNodes(numberOfNodes, gossip = true).toGossipSub()
|
||||
|
||||
startNodesAndDeferStop(nodes)
|
||||
|
||||
# When all of them are connected and subscribed to the same topic
|
||||
await connectNodesStar(nodes)
|
||||
subscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
|
||||
# Then mesh and gossipsub should be populated
|
||||
for node in nodes:
|
||||
check node.topics.contains(topic)
|
||||
check node.gossipsub.hasKey(topic)
|
||||
check node.gossipsub[topic].len() == numberOfNodes - 1
|
||||
check node.mesh.hasKey(topic)
|
||||
check node.mesh[topic].len() == numberOfNodes - 1
|
||||
|
||||
# When all nodes unsubscribe from the topic
|
||||
unsubscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
|
||||
# Then the topic should be removed from mesh and gossipsub
|
||||
for node in nodes:
|
||||
check topic notin node.topics
|
||||
check topic notin node.mesh
|
||||
check topic notin node.gossipsub
|
||||
|
||||
asyncTest "handle subscribe and unsubscribe for multiple topics":
|
||||
let
|
||||
numberOfNodes = 3
|
||||
topics = @["foobar1", "foobar2", "foobar3"]
|
||||
nodes = generateNodes(numberOfNodes, gossip = true).toGossipSub()
|
||||
|
||||
startNodesAndDeferStop(nodes)
|
||||
|
||||
# When nodes subscribe to multiple topics
|
||||
await connectNodesStar(nodes)
|
||||
for topic in topics:
|
||||
subscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
|
||||
# Then all nodes should be subscribed to the topics initially
|
||||
for node in nodes:
|
||||
for topic in topics:
|
||||
check node.topics.contains(topic)
|
||||
check node.gossipsub[topic].len() == numberOfNodes - 1
|
||||
check node.mesh[topic].len() == numberOfNodes - 1
|
||||
|
||||
# When they unsubscribe from all topics
|
||||
for topic in topics:
|
||||
unsubscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
|
||||
# Then topics should be removed from mesh and gossipsub
|
||||
for node in nodes:
|
||||
for topic in topics:
|
||||
check topic notin node.topics
|
||||
check topic notin node.mesh
|
||||
check topic notin node.gossipsub
|
||||
|
||||
asyncTest "GRAFT messages correctly add peers to mesh":
|
||||
# Given 2 nodes
|
||||
let
|
||||
topic = "foobar"
|
||||
graftMessage = ControlMessage(graft: @[ControlGraft(topicID: topic)])
|
||||
numberOfNodes = 2
|
||||
# First part of the hack: Weird dValues so peers are not GRAFTed automatically
|
||||
dValues = DValues(dLow: some(0), dHigh: some(0), d: some(0), dOut: some(-1))
|
||||
nodes = generateNodes(
|
||||
numberOfNodes, gossip = true, verifySignature = false, dValues = some(dValues)
|
||||
)
|
||||
.toGossipSub()
|
||||
n0 = nodes[0]
|
||||
n1 = nodes[1]
|
||||
|
||||
startNodesAndDeferStop(nodes)
|
||||
|
||||
# And the nodes are connected
|
||||
await connectNodesStar(nodes)
|
||||
|
||||
# And both subscribe to the topic
|
||||
subscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
|
||||
# Because of the hack-ish dValues, the peers are added to gossipsub but not GRAFTed to mesh
|
||||
check:
|
||||
n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
not n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
not n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
|
||||
# Stop both nodes in order to prevent GRAFT message to be sent by heartbeat
|
||||
await n0.stop()
|
||||
await n1.stop()
|
||||
|
||||
# Second part of the hack
|
||||
# Set values so peers can be GRAFTed
|
||||
let newDValues =
|
||||
some(DValues(dLow: some(1), dHigh: some(1), d: some(1), dOut: some(1)))
|
||||
n0.parameters.applyDValues(newDValues)
|
||||
n1.parameters.applyDValues(newDValues)
|
||||
|
||||
# When a GRAFT message is sent
|
||||
let p0 = n1.getOrCreatePeer(n0.peerInfo.peerId, @[GossipSubCodec_12])
|
||||
let p1 = n0.getOrCreatePeer(n1.peerInfo.peerId, @[GossipSubCodec_12])
|
||||
n0.broadcast(@[p1], RPCMsg(control: some(graftMessage)), isHighPriority = false)
|
||||
n1.broadcast(@[p0], RPCMsg(control: some(graftMessage)), isHighPriority = false)
|
||||
|
||||
await waitForPeersInTable(
|
||||
nodes, topic, newSeqWith(numberOfNodes, 1), PeerTableType.Mesh
|
||||
)
|
||||
|
||||
# Then the peers are GRAFTed
|
||||
check:
|
||||
n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
|
||||
asyncTest "Received GRAFT for non-subscribed topic":
|
||||
# Given 2 nodes
|
||||
let
|
||||
topic = "foo"
|
||||
graftMessage = ControlMessage(graft: @[ControlGraft(topicID: topic)])
|
||||
numberOfNodes = 2
|
||||
nodes = generateNodes(numberOfNodes, gossip = true, verifySignature = false)
|
||||
.toGossipSub()
|
||||
n0 = nodes[0]
|
||||
n1 = nodes[1]
|
||||
|
||||
startNodesAndDeferStop(nodes)
|
||||
|
||||
# And the nodes are connected
|
||||
await connectNodesStar(nodes)
|
||||
|
||||
# And only node0 subscribes to the topic
|
||||
nodes[0].subscribe(topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
|
||||
check:
|
||||
n0.topics.hasKey(topic)
|
||||
not n1.topics.hasKey(topic)
|
||||
not n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
not n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
not n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
|
||||
# When a GRAFT message is sent
|
||||
let p1 = n0.getOrCreatePeer(n1.peerInfo.peerId, @[GossipSubCodec_12])
|
||||
n0.broadcast(@[p1], RPCMsg(control: some(graftMessage)), isHighPriority = false)
|
||||
await waitForHeartbeat()
|
||||
|
||||
# Then the peer is not GRAFTed
|
||||
check:
|
||||
n0.topics.hasKey(topic)
|
||||
not n1.topics.hasKey(topic)
|
||||
not n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
not n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
not n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
|
||||
asyncTest "PRUNE messages correctly removes peers from mesh":
|
||||
# Given 2 nodes
|
||||
let
|
||||
topic = "foo"
|
||||
backoff = 1
|
||||
pruneMessage = ControlMessage(
|
||||
prune: @[ControlPrune(topicID: topic, peers: @[], backoff: uint64(backoff))]
|
||||
)
|
||||
numberOfNodes = 2
|
||||
nodes = generateNodes(numberOfNodes, gossip = true, verifySignature = false)
|
||||
.toGossipSub()
|
||||
n0 = nodes[0]
|
||||
n1 = nodes[1]
|
||||
|
||||
startNodesAndDeferStop(nodes)
|
||||
|
||||
# And the nodes are connected
|
||||
await connectNodesStar(nodes)
|
||||
|
||||
# And both subscribe to the topic
|
||||
subscribeAllNodes(nodes, topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
|
||||
check:
|
||||
n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
|
||||
# When a PRUNE message is sent
|
||||
let p1 = n0.getOrCreatePeer(n1.peerInfo.peerId, @[GossipSubCodec_12])
|
||||
n0.broadcast(@[p1], RPCMsg(control: some(pruneMessage)), isHighPriority = false)
|
||||
await waitForHeartbeat()
|
||||
|
||||
# Then the peer is PRUNEd
|
||||
check:
|
||||
n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
not n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
|
||||
# When another PRUNE message is sent
|
||||
let p0 = n1.getOrCreatePeer(n0.peerInfo.peerId, @[GossipSubCodec_12])
|
||||
n1.broadcast(@[p0], RPCMsg(control: some(pruneMessage)), isHighPriority = false)
|
||||
await waitForHeartbeat()
|
||||
|
||||
# Then the peer is PRUNEd
|
||||
check:
|
||||
n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
not n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
not n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
|
||||
asyncTest "Received PRUNE for non-subscribed topic":
|
||||
# Given 2 nodes
|
||||
let
|
||||
topic = "foo"
|
||||
pruneMessage =
|
||||
ControlMessage(prune: @[ControlPrune(topicID: topic, peers: @[], backoff: 1)])
|
||||
numberOfNodes = 2
|
||||
nodes = generateNodes(numberOfNodes, gossip = true, verifySignature = false)
|
||||
.toGossipSub()
|
||||
n0 = nodes[0]
|
||||
n1 = nodes[1]
|
||||
|
||||
startNodesAndDeferStop(nodes)
|
||||
|
||||
# And the nodes are connected
|
||||
await connectNodesStar(nodes)
|
||||
|
||||
# And only node0 subscribes to the topic
|
||||
n0.subscribe(topic, voidTopicHandler)
|
||||
await waitForHeartbeat()
|
||||
|
||||
check:
|
||||
n0.topics.hasKey(topic)
|
||||
not n1.topics.hasKey(topic)
|
||||
not n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
not n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
not n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
|
||||
# When a PRUNE message is sent
|
||||
let p1 = n0.getOrCreatePeer(n1.peerInfo.peerId, @[GossipSubCodec_12])
|
||||
n0.broadcast(@[p1], RPCMsg(control: some(pruneMessage)), isHighPriority = false)
|
||||
await waitForHeartbeat()
|
||||
|
||||
# Then the peer is not PRUNEd
|
||||
check:
|
||||
n0.topics.hasKey(topic)
|
||||
not n1.topics.hasKey(topic)
|
||||
not n0.gossipsub.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
n1.gossipsub.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
not n0.mesh.hasPeerId(topic, n1.peerInfo.peerId)
|
||||
not n1.mesh.hasPeerId(topic, n0.peerInfo.peerId)
|
||||
|
||||
@@ -16,7 +16,6 @@ import sugar
|
||||
import chronicles
|
||||
import ../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable, timedcache]
|
||||
import ../../libp2p/protocols/pubsub/rpc/[message, protobuf]
|
||||
import ../../libp2p/muxers/muxer
|
||||
import ../helpers, ../utils/[futures]
|
||||
|
||||
const MsgIdSuccess = "msg id gen success"
|
||||
@@ -78,38 +77,22 @@ suite "GossipSub Message Handling":
|
||||
checkTrackers()
|
||||
|
||||
asyncTest "Drop messages of topics without subscription":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
|
||||
check false
|
||||
|
||||
let topic = "foobar"
|
||||
var conns = newSeq[Connection]()
|
||||
for i in 0 ..< 30:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.handler = handler
|
||||
var (gossipSub, conns, peers) = setupGossipSubWithPeers(30, topic)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
# generate messages
|
||||
var seqno = 0'u64
|
||||
for i in 0 .. 5:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
let conn = conns[i]
|
||||
let peer = peers[i]
|
||||
inc seqno
|
||||
let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno))
|
||||
let msg = Message.init(conn.peerId, ("bar" & $i).toBytes(), topic, some(seqno))
|
||||
await gossipSub.rpcHandler(peer, encodeRpcMsg(RPCMsg(messages: @[msg]), false))
|
||||
|
||||
check gossipSub.mcache.msgs.len == 0
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "subscription limits":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
gossipSub.topicsHigh = 10
|
||||
@@ -859,3 +842,34 @@ suite "GossipSub Message Handling":
|
||||
publishResult == 0
|
||||
results[0].isPending()
|
||||
results[1].isPending()
|
||||
|
||||
# check correctly parsed ihave/iwant/graft/prune/idontwant messages
|
||||
# check value before & after decoding equal using protoc cmd tool for reference
|
||||
asyncTest "ControlMessage RPCMsg encoding and decoding":
|
||||
let id: seq[byte] = @[123]
|
||||
let message = RPCMsg(
|
||||
control: some(
|
||||
ControlMessage(
|
||||
ihave: @[ControlIHave(topicID: "foobar", messageIDs: @[id])],
|
||||
iwant: @[ControlIWant(messageIDs: @[id])],
|
||||
graft: @[ControlGraft(topicID: "foobar")],
|
||||
prune: @[ControlPrune(topicID: "foobar", backoff: 10.uint64)],
|
||||
idontwant: @[ControlIWant(messageIDs: @[id])],
|
||||
)
|
||||
)
|
||||
)
|
||||
#data encoded using protoc cmd tool
|
||||
let expectedEncoded: seq[byte] =
|
||||
@[
|
||||
26, 45, 10, 11, 10, 6, 102, 111, 111, 98, 97, 114, 18, 1, 123, 18, 3, 10, 1,
|
||||
123, 26, 8, 10, 6, 102, 111, 111, 98, 97, 114, 34, 10, 10, 6, 102, 111, 111, 98,
|
||||
97, 114, 24, 10, 42, 3, 10, 1, 123,
|
||||
]
|
||||
|
||||
let actualEncoded = encodeRpcMsg(message, true)
|
||||
check:
|
||||
actualEncoded == expectedEncoded
|
||||
|
||||
let actualDecoded = decodeRpcMsg(expectedEncoded).value
|
||||
check:
|
||||
actualDecoded == message
|
||||
|
||||
@@ -23,24 +23,18 @@ suite "GossipSub Scoring":
|
||||
checkTrackers()
|
||||
|
||||
asyncTest "Disconnect bad peers":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
let topic = "foobar"
|
||||
var (gossipSub, conns, peers) =
|
||||
setupGossipSubWithPeers(30, topic, populateGossipsub = true)
|
||||
defer:
|
||||
await teardownGossipSub(gossipSub, conns)
|
||||
|
||||
gossipSub.parameters.disconnectBadPeers = true
|
||||
gossipSub.parameters.appSpecificWeight = 1.0
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
|
||||
check false
|
||||
|
||||
let topic = "foobar"
|
||||
var conns = newSeq[Connection]()
|
||||
for i in 0 ..< 30:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.sendConn = conn
|
||||
peer.handler = handler
|
||||
for i, peer in peers:
|
||||
peer.appScore = gossipSub.parameters.graylistThreshold - 1
|
||||
gossipSub.gossipsub.mgetOrPut(topic, initHashSet[PubSubPeer]()).incl(peer)
|
||||
let conn = conns[i]
|
||||
gossipSub.switch.connManager.storeMuxer(Muxer(connection: conn))
|
||||
|
||||
gossipSub.updateScores()
|
||||
@@ -53,9 +47,6 @@ suite "GossipSub Scoring":
|
||||
# also ensure we cleanup properly the peersInIP table
|
||||
gossipSub.peersInIP.len == 0
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
asyncTest "flood publish to all peers with score above threshold, regardless of subscription":
|
||||
let
|
||||
numberOfNodes = 3
|
||||
|
||||
@@ -5,7 +5,7 @@ const
|
||||
libp2p_pubsub_anonymize {.booldefine.} = false
|
||||
|
||||
import hashes, random, tables, sets, sequtils, sugar
|
||||
import chronos, stew/[byteutils, results], chronos/ratelimit
|
||||
import chronos, results, stew/byteutils, chronos/ratelimit
|
||||
import
|
||||
../../libp2p/[
|
||||
builders,
|
||||
@@ -18,7 +18,7 @@ import
|
||||
protocols/pubsub/rpc/messages,
|
||||
protocols/secure/secure,
|
||||
]
|
||||
import ../helpers, ../utils/futures
|
||||
import ../helpers
|
||||
import chronicles
|
||||
|
||||
export builders
|
||||
@@ -42,6 +42,21 @@ type
|
||||
dOut*: Option[int]
|
||||
dLazy*: Option[int]
|
||||
|
||||
proc noop*(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
discard
|
||||
|
||||
proc voidTopicHandler*(topic: string, data: seq[byte]) {.async.} =
|
||||
discard
|
||||
|
||||
proc voidPeerHandler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
|
||||
discard
|
||||
|
||||
proc randomPeerId*(): PeerId =
|
||||
try:
|
||||
PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet()
|
||||
except CatchableError as exc:
|
||||
raise newException(Defect, exc.msg)
|
||||
|
||||
proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer =
|
||||
proc getConn(): Future[Connection] {.
|
||||
async: (raises: [CancelledError, GetConnDialError])
|
||||
@@ -61,11 +76,57 @@ proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer =
|
||||
onNewPeer(p, pubSubPeer)
|
||||
pubSubPeer
|
||||
|
||||
proc randomPeerId*(): PeerId =
|
||||
try:
|
||||
PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet()
|
||||
except CatchableError as exc:
|
||||
raise newException(Defect, exc.msg)
|
||||
proc setupGossipSubWithPeers*(
|
||||
numPeers: int,
|
||||
topics: seq[string],
|
||||
populateGossipsub: bool = false,
|
||||
populateMesh: bool = false,
|
||||
populateFanout: bool = false,
|
||||
): (TestGossipSub, seq[Connection], seq[PubSubPeer]) =
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
for topic in topics:
|
||||
gossipSub.topicParams[topic] = TopicParams.init()
|
||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
|
||||
|
||||
var conns = newSeq[Connection]()
|
||||
var peers = newSeq[PubSubPeer]()
|
||||
for i in 0 ..< numPeers:
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.sendConn = conn
|
||||
peer.handler = voidPeerHandler
|
||||
peers &= peer
|
||||
for topic in topics:
|
||||
if (populateGossipsub):
|
||||
gossipSub.gossipsub[topic].incl(peer)
|
||||
if (populateMesh):
|
||||
gossipSub.grafted(peer, topic)
|
||||
gossipSub.mesh[topic].incl(peer)
|
||||
if (populateFanout):
|
||||
gossipSub.fanout[topic].incl(peer)
|
||||
|
||||
return (gossipSub, conns, peers)
|
||||
|
||||
proc setupGossipSubWithPeers*(
|
||||
numPeers: int,
|
||||
topic: string,
|
||||
populateGossipsub: bool = false,
|
||||
populateMesh: bool = false,
|
||||
populateFanout: bool = false,
|
||||
): (TestGossipSub, seq[Connection], seq[PubSubPeer]) =
|
||||
return setupGossipSubWithPeers(
|
||||
numPeers, @[topic], populateGossipsub, populateMesh, populateFanout
|
||||
)
|
||||
|
||||
proc teardownGossipSub*(gossipSub: TestGossipSub, conns: seq[Connection]) {.async.} =
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
func defaultMsgIdProvider*(m: Message): Result[MessageId, ValidationResult] =
|
||||
let mid =
|
||||
@@ -78,7 +139,7 @@ func defaultMsgIdProvider*(m: Message): Result[MessageId, ValidationResult] =
|
||||
$m.data.hash & $m.topic.hash
|
||||
ok mid.toBytes()
|
||||
|
||||
proc applyDValues(parameters: var GossipSubParams, dValues: Option[DValues]) =
|
||||
proc applyDValues*(parameters: var GossipSubParams, dValues: Option[DValues]) =
|
||||
if dValues.isNone:
|
||||
return
|
||||
let values = dValues.get
|
||||
@@ -168,18 +229,21 @@ proc generateNodes*(
|
||||
switch.mount(pubsub)
|
||||
result.add(pubsub)
|
||||
|
||||
proc connectNodes*(dialer: PubSub, target: PubSub) {.async.} =
|
||||
proc toGossipSub*(nodes: seq[PubSub]): seq[GossipSub] =
|
||||
return nodes.mapIt(GossipSub(it))
|
||||
|
||||
proc connectNodes*[T: PubSub](dialer: T, target: T) {.async.} =
|
||||
doAssert dialer.switch.peerInfo.peerId != target.switch.peerInfo.peerId,
|
||||
"Could not connect same peer"
|
||||
await dialer.switch.connect(target.peerInfo.peerId, target.peerInfo.addrs)
|
||||
|
||||
proc connectNodesStar*(nodes: seq[PubSub]) {.async.} =
|
||||
proc connectNodesStar*[T: PubSub](nodes: seq[T]) {.async.} =
|
||||
for dialer in nodes:
|
||||
for node in nodes:
|
||||
if dialer.switch.peerInfo.peerId != node.switch.peerInfo.peerId:
|
||||
await connectNodes(dialer, node)
|
||||
|
||||
proc connectNodesSparse*(nodes: seq[PubSub], degree: int = 2) {.async.} =
|
||||
proc connectNodesSparse*[T: PubSub](nodes: seq[T], degree: int = 2) {.async.} =
|
||||
if nodes.len < degree:
|
||||
raise
|
||||
(ref CatchableError)(msg: "nodes count needs to be greater or equal to degree!")
|
||||
@@ -324,23 +388,31 @@ proc waitForPeersInTable*(
|
||||
)
|
||||
allSatisfied = checkState(nodes, topic, peerCounts, table, satisfied)
|
||||
|
||||
proc startNodes*(nodes: seq[PubSub]) {.async.} =
|
||||
proc startNodes*[T: PubSub](nodes: seq[T]) {.async.} =
|
||||
await allFuturesThrowing(nodes.mapIt(it.switch.start()))
|
||||
|
||||
proc stopNodes*(nodes: seq[PubSub]) {.async.} =
|
||||
proc stopNodes*[T: PubSub](nodes: seq[T]) {.async.} =
|
||||
await allFuturesThrowing(nodes.mapIt(it.switch.stop()))
|
||||
|
||||
template startNodesAndDeferStop*(nodes: seq[PubSub]): untyped =
|
||||
template startNodesAndDeferStop*[T: PubSub](nodes: seq[T]): untyped =
|
||||
await startNodes(nodes)
|
||||
defer:
|
||||
await stopNodes(nodes)
|
||||
|
||||
proc subscribeAllNodes*(nodes: seq[PubSub], topic: string, topicHandler: TopicHandler) =
|
||||
proc subscribeAllNodes*[T: PubSub](
|
||||
nodes: seq[T], topic: string, topicHandler: TopicHandler
|
||||
) =
|
||||
for node in nodes:
|
||||
node.subscribe(topic, topicHandler)
|
||||
|
||||
proc subscribeAllNodes*(
|
||||
nodes: seq[PubSub], topic: string, topicHandlers: seq[TopicHandler]
|
||||
proc unsubscribeAllNodes*[T: PubSub](
|
||||
nodes: seq[T], topic: string, topicHandler: TopicHandler
|
||||
) =
|
||||
for node in nodes:
|
||||
node.unsubscribe(topic, topicHandler)
|
||||
|
||||
proc subscribeAllNodes*[T: PubSub](
|
||||
nodes: seq[T], topic: string, topicHandlers: seq[TopicHandler]
|
||||
) =
|
||||
if nodes.len != topicHandlers.len:
|
||||
raise (ref CatchableError)(msg: "nodes and topicHandlers count needs to match!")
|
||||
@@ -360,12 +432,6 @@ template tryPublish*(
|
||||
|
||||
doAssert pubs >= require, "Failed to publish!"
|
||||
|
||||
proc noop*(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
discard
|
||||
|
||||
proc voidTopicHandler*(topic: string, data: seq[byte]) {.async.} =
|
||||
discard
|
||||
|
||||
proc createCompleteHandler*(): (
|
||||
Future[bool], proc(topic: string, data: seq[byte]) {.async.}
|
||||
) =
|
||||
|
||||
@@ -11,8 +11,9 @@
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/net
|
||||
import tables
|
||||
import chronos, stew/[byteutils, endians2, shims/net]
|
||||
import chronos, stew/[byteutils, endians2]
|
||||
import
|
||||
../../libp2p/[
|
||||
stream/connection,
|
||||
@@ -62,7 +63,8 @@ proc start*(self: TorServerStub, address: TransportAddress) {.async.} =
|
||||
var ip: array[4, byte]
|
||||
for i, e in msg[0 ..^ 3]:
|
||||
ip[i] = e
|
||||
$(ipv4(ip)) & ":" & $(Port(fromBytesBE(uint16, msg[^2 ..^ 1])))
|
||||
$(IpAddress(family: IPv4, address_v4: ip)) & ":" &
|
||||
$(Port(fromBytesBE(uint16, msg[^2 ..^ 1])))
|
||||
of Socks5AddressType.IPv6.byte:
|
||||
let n = 16 + 2 # +2 bytes for the port
|
||||
msg = newSeq[byte](n) # +2 bytes for the port
|
||||
@@ -70,7 +72,8 @@ proc start*(self: TorServerStub, address: TransportAddress) {.async.} =
|
||||
var ip: array[16, byte]
|
||||
for i, e in msg[0 ..^ 3]:
|
||||
ip[i] = e
|
||||
$(ipv6(ip)) & ":" & $(Port(fromBytesBE(uint16, msg[^2 ..^ 1])))
|
||||
$(IpAddress(family: IPv6, address_v6: ip)) & ":" &
|
||||
$(Port(fromBytesBE(uint16, msg[^2 ..^ 1])))
|
||||
of Socks5AddressType.FQDN.byte:
|
||||
await connSrc.readExactly(addr msg[0], 1)
|
||||
let n = int(uint8.fromBytes(msg[0 .. 0])) + 2 # +2 bytes for the port
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
# those terms.
|
||||
|
||||
import chronos, stew/byteutils
|
||||
import ../libp2p/stream/bufferstream, ../libp2p/stream/lpstream, ../libp2p/errors
|
||||
import ../libp2p/stream/bufferstream, ../libp2p/stream/lpstream
|
||||
|
||||
import ./helpers
|
||||
|
||||
|
||||
@@ -10,11 +10,9 @@
|
||||
# those terms.
|
||||
|
||||
import std/[sequtils, tables]
|
||||
import stew/results
|
||||
import results
|
||||
import chronos
|
||||
import
|
||||
../libp2p/
|
||||
[connmanager, stream/connection, crypto/crypto, muxers/muxer, peerinfo, errors]
|
||||
import ../libp2p/[connmanager, stream/connection, crypto/crypto, muxers/muxer, peerinfo]
|
||||
|
||||
import helpers
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ import
|
||||
discovery/discoverymngr,
|
||||
discovery/rendezvousinterface,
|
||||
]
|
||||
import ./helpers, ./utils/[futures, async_tests]
|
||||
import ./helpers, ./utils/async_tests
|
||||
|
||||
proc createSwitch(rdv: RendezVous = RendezVous.new()): Switch =
|
||||
SwitchBuilder
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import helpers, commoninterop
|
||||
import ../libp2p
|
||||
import ../libp2p/crypto/crypto, ../libp2p/protocols/connectivity/relay/[relay, client]
|
||||
import ../libp2p/crypto/crypto, ../libp2p/protocols/connectivity/relay/relay
|
||||
|
||||
proc switchMplexCreator(
|
||||
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
import sequtils, strutils
|
||||
import chronos
|
||||
import ../libp2p/[protocols/rendezvous, switch, builders]
|
||||
import ../libp2p/discovery/[rendezvousinterface, discoverymngr]
|
||||
import ../libp2p/discovery/discoverymngr
|
||||
import ./helpers
|
||||
|
||||
proc createSwitch(rdv: RendezVous = RendezVous.new()): Switch =
|
||||
|
||||
@@ -9,8 +9,7 @@
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
import std/[options, sequtils]
|
||||
import stew/[byteutils]
|
||||
import std/options
|
||||
import chronos, metrics
|
||||
import unittest2
|
||||
import ../libp2p/[builders, switch]
|
||||
|
||||
@@ -13,6 +13,6 @@ COPY . nim-libp2p/
|
||||
|
||||
RUN \
|
||||
cd nim-libp2p && \
|
||||
nim c --skipProjCfg --skipParentCfg --NimblePath:./nimbledeps/pkgs -p:nim-libp2p -d:chronicles_log_level=WARN -d:chronicles_default_output_device=stderr --threads:off ./tests/transport-interop/main.nim
|
||||
nim c --skipProjCfg --skipParentCfg --NimblePath:./nimbledeps/pkgs -p:nim-libp2p -d:libp2p_quic_support -d:chronicles_log_level=WARN -d:chronicles_default_output_device=stderr --threads:off ./tests/transport-interop/main.nim
|
||||
|
||||
ENTRYPOINT ["/app/nim-libp2p/tests/transport-interop/main"]
|
||||
|
||||
@@ -99,7 +99,18 @@ proc main() {.async.} =
|
||||
pingRTTMilllis: float(pingDelay.milliseconds),
|
||||
)
|
||||
)
|
||||
quit(0)
|
||||
|
||||
discard waitFor(main().withTimeout(testTimeout))
|
||||
quit(1)
|
||||
try:
|
||||
proc mainAsync(): Future[string] {.async.} =
|
||||
# mainAsync wraps main and returns some value, as otherwise
|
||||
# 'waitFor(fut)' has no type (or is ambiguous)
|
||||
await main()
|
||||
return "done"
|
||||
|
||||
discard waitFor(mainAsync().wait(testTimeout))
|
||||
except AsyncTimeoutError:
|
||||
error "Program execution timed out."
|
||||
quit(-1)
|
||||
except CatchableError as e:
|
||||
error "Unexpected error", description = e.msg
|
||||
quit(-1)
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
{.used.}
|
||||
|
||||
import unittest2
|
||||
|
||||
import times, base64
|
||||
import times
|
||||
import ../../../libp2p/transports/tls/certificate
|
||||
import ../../../libp2p/transports/tls/certificate_ffi
|
||||
import ../../../libp2p/crypto/crypto
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import chronos/futures, stew/results, chronos, sequtils
|
||||
import chronos/futures, results, chronos, sequtils
|
||||
|
||||
const
|
||||
DURATION_TIMEOUT* = 1.seconds
|
||||
|
||||
Reference in New Issue
Block a user