Compare commits

...

17 Commits

Author SHA1 Message Date
Tanguy
01c2c0c54b Ring buffer implem
Not fully finished nor tested, but good to know it exists
2022-03-18 12:01:02 +01:00
Tanguy
eaa72dcdbe WS Accept timeout (#699)
* Add timeout to WS accept
* Handle more WS errors
2022-03-17 10:16:48 +01:00
Tanguy
c7504d2446 Gossipsub peer exchange (#647)
* Signed envelopes and routing records
* Send signed peer record as part of identify (#649)
* Add SPR from identify to new peer book (#657)
* Send & receive gossipsub PX
* Add Signed Payload

Co-authored-by: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com>
2022-03-14 09:39:30 +01:00
Tanguy
cba3ca3c3e Unstable (#695) 2022-02-25 09:41:32 +01:00
Eric Mastro
44a7260f07 fixes from #688 (#697)
* tests: invert message logic on expect from #688
* fix: export pubsub_errors for backward compatibility
2022-02-24 17:32:20 +01:00
Tanguy
c09d032133 Allow force dial (#696) 2022-02-24 17:31:47 +01:00
Tanguy
f98bf612bd Fix tests of #638 2022-02-21 18:14:43 +01:00
Tanguy
fd59cbc7a9 Fix shuffle of #638 2022-02-21 17:00:18 +01:00
Tanguy
bc318084f4 GS: Publish to fanout when mesh unhealthy (#638)
* Send to fanout when mesh unhealthy

* don't use fanout when floodPublish
2022-02-21 16:22:08 +01:00
Eric Mastro
3b718baa97 feat: allow msgIdProvider to fail (#688)
* feat: allow msgIdProvider to fail

Closes: #642.

Changes the return type of the msgIdProvider to `Result[MessageID, string]` so that message id generation can fail.

String error type was chosen as this `msgIdProvider` mainly because the failed message id generation drops the message and logs the error provided. Because `msgIdProvider` can be externally provided by library consumers, an enum didn’t make sense and a object seemed to be overkill. Exceptions could have been used as well, however, in this case, Result ergonomics were warranted and prevented wrapping quite a large block of code in try/except.

The `defaultMsgIdProvider` function previously allowed message id generation to fail silently for use in the tests: when seqno or source peerid were not valid, the message id generated was based on a hash of the message data and topic ids. The silent failing was moved to the `defaultMsgIdProvider` used only in the tests so that it could not fail silently in applications.

Unit tests were added for the `defaultMsgIdProvider`.

* Change MsgIdProvider error type to ValidationResult
2022-02-21 16:04:17 +01:00
Tanguy
9a7e3bda3c Bump dependencies (#694) 2022-02-10 14:21:12 +01:00
Tanguy
00e1f9342f Fix identify log for json_sink (#690) 2022-02-01 18:35:48 +01:00
Tanguy
07da14a7a7 Fix websocket EOF reading exception (#689) 2022-01-28 18:05:07 +00:00
Tanguy
c18830ad33 Score correctly on mesh peer unsub (#644)
* Score correctly on mesh peer unsub
* remove from mesh before removing from gossipsub
2022-01-15 12:47:41 +01:00
Tanguy
1a97d0a2f5 Validate pubsub subscriptions (#627)
* Check topic before subscribing
* Block subscribe to invalid topics
2022-01-14 12:40:30 -06:00
Ștefan Talpalaru
e72d03bc78 CI: test with multiple Nim versions by default (#684)
* CI: test with 1.2 & devel by default
* Skip buggy windows websocket test
2022-01-10 12:29:52 +01:00
Tanguy
388b92d58f Bump dependencies (#683) 2022-01-07 08:19:22 +01:00
40 changed files with 1236 additions and 333 deletions

View File

@@ -9,78 +9,49 @@ on:
jobs:
build:
timeout-minutes: 90
strategy:
fail-fast: false
max-parallel: 20
matrix:
target:
# Unit tests
- os: linux
cpu: amd64
- os: linux
cpu: i386
- os: macos
cpu: amd64
- os: windows
cpu: i386
- os: windows
cpu: amd64
#- os: windows
#cpu: i386
branch: [version-1-2, devel]
include:
- target:
os: linux
builder: ubuntu-20.04
shell: bash
- target:
os: macos
builder: macos-10.15
shell: bash
- target:
os: windows
builder: windows-2019
shell: msys2 {0}
defaults:
run:
shell: bash
shell: ${{ matrix.shell }}
name: '${{ matrix.target.os }}-${{ matrix.target.cpu }}'
name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})'
runs-on: ${{ matrix.builder }}
continue-on-error: ${{ matrix.branch == 'version-1-6' || matrix.branch == 'devel' }}
steps:
- name: Checkout nim-libp2p
- name: Checkout
uses: actions/checkout@v2
with:
submodules: true
- name: Derive environment variables
run: |
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
ARCH=64
PLATFORM=x64
else
ARCH=32
PLATFORM=x86
fi
echo "ARCH=$ARCH" >> $GITHUB_ENV
echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV
ncpu=
ext=
MAKE_CMD="make"
case '${{ runner.os }}' in
'Linux')
ncpu=$(nproc)
;;
'macOS')
ncpu=$(sysctl -n hw.ncpu)
;;
'Windows')
ncpu=$NUMBER_OF_PROCESSORS
ext=.exe
MAKE_CMD="mingw32-make"
;;
esac
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
echo "ncpu=$ncpu" >> $GITHUB_ENV
echo "ext=$ext" >> $GITHUB_ENV
echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV
- name: Install build dependencies (Linux i386)
if: runner.os == 'Linux' && matrix.target.cpu == 'i386'
run: |
@@ -101,68 +72,83 @@ jobs:
chmod 755 external/bin/gcc external/bin/g++
echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH
- name: Restore MinGW-W64 (Windows) from cache
if: runner.os == 'Windows'
id: windows-mingw-cache
uses: actions/cache@v2
- name: MSYS2 (Windows i386)
if: runner.os == 'Windows' && matrix.target.cpu == 'i386'
uses: msys2/setup-msys2@v2
with:
path: external/mingw-${{ matrix.target.cpu }}
key: 'mingw-${{ matrix.target.cpu }}'
path-type: inherit
msystem: MINGW32
install: >-
base-devel
git
mingw-w64-i686-toolchain
- name: MSYS2 (Windows amd64)
if: runner.os == 'Windows' && matrix.target.cpu == 'amd64'
uses: msys2/setup-msys2@v2
with:
path-type: inherit
install: >-
base-devel
git
mingw-w64-x86_64-toolchain
- name: Restore Nim DLLs dependencies (Windows) from cache
if: runner.os == 'Windows'
id: windows-dlls-cache
uses: actions/cache@v2
with:
path: external/dlls-${{ matrix.target.cpu }}
key: 'dlls-${{ matrix.target.cpu }}'
path: external/dlls
key: 'dlls'
- name: Install MinGW64 dependency (Windows)
if: >
steps.windows-mingw-cache.outputs.cache-hit != 'true' &&
runner.os == 'Windows'
run: |
mkdir -p external
curl -L "https://nim-lang.org/download/mingw$ARCH.7z" -o "external/mingw-${{ matrix.target.cpu }}.7z"
7z x -y "external/mingw-${{ matrix.target.cpu }}.7z" -oexternal/
mv external/mingw$ARCH external/mingw-${{ matrix.target.cpu }}
- name: Install DLLs dependencies (Windows)
- name: Install DLL dependencies (Windows)
if: >
steps.windows-dlls-cache.outputs.cache-hit != 'true' &&
runner.os == 'Windows'
run: |
mkdir -p external
mkdir external
curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip
7z x -y external/windeps.zip -oexternal/dlls-${{ matrix.target.cpu }}
7z x external/windeps.zip -oexternal/dlls
- name: Path to cached dependencies (Windows)
if: >
runner.os == 'Windows'
run: |
echo "${{ github.workspace }}/external/mingw-${{ matrix.target.cpu }}/bin" >> $GITHUB_PATH
echo "${{ github.workspace }}/external/dlls-${{ matrix.target.cpu }}" >> $GITHUB_PATH
echo '${{ github.workspace }}'"/external/dlls" >> $GITHUB_PATH
- name: Get latest Nim commit hash
id: versions
- name: Derive environment variables
run: |
getHash() {
git ls-remote "https://github.com/$1" "${2:-HEAD}" | cut -f 1
}
nbsHash=$(getHash status-im/nimbus-build-system)
echo "::set-output name=nimbus_build_system::$nbsHash"
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
PLATFORM=x64
else
PLATFORM=x86
fi
echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV
- name: Restore prebuilt Nim from cache
id: nim-cache
uses: actions/cache@v2
with:
path: NimBinaries
key: 'NimBinaries-${{ matrix.target.os }}-${{ matrix.target.cpu }}-${{ steps.versions.outputs.nimbus_build_system }}'
ncpu=
MAKE_CMD="make"
case '${{ runner.os }}' in
'Linux')
ncpu=$(nproc)
;;
'macOS')
ncpu=$(sysctl -n hw.ncpu)
;;
'Windows')
ncpu=$NUMBER_OF_PROCESSORS
MAKE_CMD="mingw32-make"
;;
esac
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
echo "ncpu=$ncpu" >> $GITHUB_ENV
echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV
- name: Build Nim and associated tools
- name: Build Nim and Nimble
run: |
curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh
env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} CC=gcc bash build_nim.sh nim csources dist/nimble NimBinaries
env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} NIM_COMMIT=${{ matrix.branch }} \
QUICK_AND_DIRTY_COMPILER=1 QUICK_AND_DIRTY_NIMBLE=1 CC=gcc \
bash build_nim.sh nim csources dist/nimble NimBinaries
echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH
- name: Setup Go
@@ -174,8 +160,14 @@ jobs:
run: |
V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3
- name: Run nim-libp2p tests
- name: Run tests
run: |
if [[ "${{ matrix.target.os }}" == "windows" ]]; then
# https://github.com/status-im/nimbus-eth2/issues/3121
export NIMFLAGS="-d:nimRawSetjmp"
fi
nim --version
nimble --version
nimble install_pinned
nimble test

View File

@@ -6,6 +6,7 @@ on:
jobs:
build:
timeout-minutes: 120
strategy:
fail-fast: false
matrix:
@@ -16,69 +17,39 @@ jobs:
cpu: i386
- os: macos
cpu: amd64
#- os: windows
#cpu: i386
- os: windows
cpu: amd64
#- os: windows
#cpu: i386
branch: [version-1-2, version-1-4, version-1-6, devel]
include:
- target:
os: linux
builder: ubuntu-20.04
shell: bash
- target:
os: macos
builder: macos-10.15
shell: bash
- target:
os: windows
builder: windows-2019
shell: msys2 {0}
defaults:
run:
shell: bash
shell: ${{ matrix.shell }}
name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})'
runs-on: ${{ matrix.builder }}
continue-on-error: ${{ matrix.branch == 'version-1-6' || matrix.branch == 'devel' }}
steps:
- name: Checkout nim-libp2p
- name: Checkout
uses: actions/checkout@v2
with:
ref: master
ref: unstable
submodules: true
- name: Derive environment variables
run: |
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
ARCH=64
PLATFORM=x64
else
ARCH=32
PLATFORM=x86
fi
echo "ARCH=$ARCH" >> $GITHUB_ENV
echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV
ncpu=
ext=
MAKE_CMD="make"
case '${{ runner.os }}' in
'Linux')
ncpu=$(nproc)
;;
'macOS')
ncpu=$(sysctl -n hw.ncpu)
;;
'Windows')
ncpu=$NUMBER_OF_PROCESSORS
ext=.exe
MAKE_CMD="mingw32-make"
;;
esac
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
echo "ncpu=$ncpu" >> $GITHUB_ENV
echo "ext=$ext" >> $GITHUB_ENV
echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV
- name: Install build dependencies (Linux i386)
if: runner.os == 'Linux' && matrix.target.cpu == 'i386'
run: |
@@ -99,47 +70,76 @@ jobs:
chmod 755 external/bin/gcc external/bin/g++
echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH
- name: Restore MinGW-W64 (Windows) from cache
if: runner.os == 'Windows'
id: windows-mingw-cache
uses: actions/cache@v2
- name: MSYS2 (Windows i386)
if: runner.os == 'Windows' && matrix.target.cpu == 'i386'
uses: msys2/setup-msys2@v2
with:
path: external/mingw-${{ matrix.target.cpu }}
key: 'mingw-${{ matrix.target.cpu }}'
path-type: inherit
msystem: MINGW32
install: >-
base-devel
git
mingw-w64-i686-toolchain
- name: MSYS2 (Windows amd64)
if: runner.os == 'Windows' && matrix.target.cpu == 'amd64'
uses: msys2/setup-msys2@v2
with:
path-type: inherit
install: >-
base-devel
git
mingw-w64-x86_64-toolchain
- name: Restore Nim DLLs dependencies (Windows) from cache
if: runner.os == 'Windows'
id: windows-dlls-cache
uses: actions/cache@v2
with:
path: external/dlls-${{ matrix.target.cpu }}
key: 'dlls-${{ matrix.target.cpu }}'
path: external/dlls
key: 'dlls'
- name: Install MinGW64 dependency (Windows)
if: >
steps.windows-mingw-cache.outputs.cache-hit != 'true' &&
runner.os == 'Windows'
run: |
mkdir -p external
curl -L "https://nim-lang.org/download/mingw$ARCH.7z" -o "external/mingw-${{ matrix.target.cpu }}.7z"
7z x -y "external/mingw-${{ matrix.target.cpu }}.7z" -oexternal/
mv external/mingw$ARCH external/mingw-${{ matrix.target.cpu }}
- name: Install DLLs dependencies (Windows)
- name: Install DLL dependencies (Windows)
if: >
steps.windows-dlls-cache.outputs.cache-hit != 'true' &&
runner.os == 'Windows'
run: |
mkdir -p external
mkdir external
curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip
7z x -y external/windeps.zip -oexternal/dlls-${{ matrix.target.cpu }}
7z x external/windeps.zip -oexternal/dlls
- name: Path to cached dependencies (Windows)
if: >
runner.os == 'Windows'
run: |
echo "${{ github.workspace }}/external/mingw-${{ matrix.target.cpu }}/bin" >> $GITHUB_PATH
echo "${{ github.workspace }}/external/dlls-${{ matrix.target.cpu }}" >> $GITHUB_PATH
echo '${{ github.workspace }}'"/external/dlls" >> $GITHUB_PATH
- name: Derive environment variables
run: |
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
PLATFORM=x64
else
PLATFORM=x86
fi
echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV
ncpu=
MAKE_CMD="make"
case '${{ runner.os }}' in
'Linux')
ncpu=$(nproc)
;;
'macOS')
ncpu=$(sysctl -n hw.ncpu)
;;
'Windows')
ncpu=$NUMBER_OF_PROCESSORS
MAKE_CMD="mingw32-make"
;;
esac
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
echo "ncpu=$ncpu" >> $GITHUB_ENV
echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV
- name: Build Nim and Nimble
run: |
@@ -149,12 +149,27 @@ jobs:
bash build_nim.sh nim csources dist/nimble NimBinaries
echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH
- name: Run nim-libp2p tests
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: '^1.15.5'
- name: Install p2pd
run: |
V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3
- name: Run tests
run: |
if [[ "${{ matrix.target.os }}" == "windows" ]]; then
# https://github.com/status-im/nimbus-eth2/issues/3121
export NIMFLAGS="-d:nimRawSetjmp"
fi
nim --version
nimble --version
nimble install -y --depsOnly
nimble test_slim
nimble test
if [[ "${{ matrix.branch }}" == "version-1-6" || "${{ matrix.branch }}" == "devel" ]]; then
echo -e "\nTesting with '--gc:orc':\n"
export NIMFLAGS="--gc:orc"
nimble test_slim
export NIMFLAGS="${NIMFLAGS} --gc:orc"
nimble test
fi

20
.pinned
View File

@@ -1,17 +1,17 @@
asynctest;https://github.com/markspanbroek/asynctest@#3882ed64ed3159578f796bc5ae0c6b13837fe798
bearssl;https://github.com/status-im/nim-bearssl@#ba80e2a0d7ae8aab666cee013e38ff8d33a3e5e7
chronicles;https://github.com/status-im/nim-chronicles@#2a2681b60289aaf7895b7056f22616081eb1a882
chronos;https://github.com/status-im/nim-chronos@#7dc58d42b6905a7fd7531875fa76060f8f744e4e
chronos;https://github.com/status-im/nim-chronos@#87197230779002a2bfa8642f0e2ae07e2349e304
dnsclient;https://github.com/ba0f3/dnsclient.nim@#fbb76f8af8a33ab818184a7d4406d9fee20993be
faststreams;https://github.com/status-im/nim-faststreams@#c653d05f277dca0f374732c5b9b80f2368faea33
httputils;https://github.com/status-im/nim-http-utils@#507bfb7dcb6244d76ce2567df7bf3756cbe88775
json_serialization;https://github.com/status-im/nim-json-serialization@#010aa238cf6afddf1fbe4cbcd27ab3be3f443841
metrics;https://github.com/status-im/nim-metrics@#2c0c486c65f980e8387f86bed0b43d53161c8286
faststreams;https://github.com/status-im/nim-faststreams@#37a183153c071539ab870f427c09a1376ba311b9
httputils;https://github.com/status-im/nim-http-utils@#40048e8b3e69284bdb5d4daa0a16ad93402c55db
json_serialization;https://github.com/status-im/nim-json-serialization@#4b8f487d2dfdd941df7408ceaa70b174cce02180
metrics;https://github.com/status-im/nim-metrics@#71e0f0e354e1f4c59e3dc92153989c8b723c3440
nimcrypto;https://github.com/cheatfate/nimcrypto@#a5742a9a214ac33f91615f3862c7b099aec43b00
secp256k1;https://github.com/status-im/nim-secp256k1@#d790c42206fab4b8008eaa91181ca8c8c68a0105
serialization;https://github.com/status-im/nim-serialization@#11a8aa64d27d4fa92e266b9488500461da193c24
stew;https://github.com/status-im/nim-stew@#2f9c61f485e1de6d7e163294008276c455d39da2
secp256k1;https://github.com/status-im/nim-secp256k1@#e092373a5cbe1fa25abfc62e0f2a5f138dc3fb13
serialization;https://github.com/status-im/nim-serialization@#37bc0db558d85711967acb16e9bb822b06911d46
stew;https://github.com/status-im/nim-stew@#bb705bf17b46d2c8f9bfb106d9cc7437009a2501
testutils;https://github.com/status-im/nim-testutils@#aa6e5216f4b4ab5aa971cdcdd70e1ec1203cedf2
unittest2;https://github.com/status-im/nim-unittest2@#4e2893eacb916c7678fdc4935ff7420f13bf3a9c
websock;https://github.com/status-im/nim-websock@#c2aae352f7fad7a8d333327c37e966969d3ee542
zlib;https://github.com/status-im/nim-zlib@#d4e716d071eba1b5e0ffdf7949d983959e2b95dd
websock;https://github.com/status-im/nim-websock@#853299e399746eff4096870067cbc61861ecd534
zlib;https://github.com/status-im/nim-zlib@#74cdeb54b21bededb5a515d36f608bc1850555a2

View File

@@ -27,9 +27,8 @@ const nimflags =
proc runTest(filename: string, verify: bool = true, sign: bool = true,
moreoptions: string = "") =
let env_nimflags = getEnv("NIMFLAGS")
var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics -d:libp2p_network_protocols_metrics "
excstr.add(" " & env_nimflags & " ")
excstr.add(" " & getEnv("NIMFLAGS") & " ")
excstr.add(" " & nimflags & " ")
excstr.add(" -d:libp2p_pubsub_sign=" & $sign)
excstr.add(" -d:libp2p_pubsub_verify=" & $verify)
@@ -91,7 +90,7 @@ task test, "Runs the test suite":
exec "nimble testfilter"
exec "nimble examples_build"
task test_slim, "Runs the test suite":
task test_slim, "Runs the (slimmed down) test suite":
exec "nimble testnative"
exec "nimble testpubsub_slim"
exec "nimble testfilter"

View File

@@ -42,6 +42,7 @@ type
rng: ref BrHmacDrbgContext
maxConnections: int
maxIn: int
sendSignedPeerRecord: bool
maxOut: int
maxConnsPerPeer: int
protoVersion: string
@@ -77,6 +78,9 @@ proc withAddresses*(b: SwitchBuilder, addresses: seq[MultiAddress]): SwitchBuild
b.addresses = addresses
b
proc withSignedPeerRecord*(b: SwitchBuilder, sendIt = true): SwitchBuilder =
b.sendSignedPeerRecord = sendIt
b
proc withMplex*(b: SwitchBuilder, inTimeout = 5.minutes, outTimeout = 5.minutes): SwitchBuilder =
proc newMuxer(conn: Connection): Muxer =
@@ -165,7 +169,7 @@ proc build*(b: SwitchBuilder): Switch
muxers
let
identify = Identify.new(peerInfo)
identify = Identify.new(peerInfo, b.sendSignedPeerRecord)
connManager = ConnManager.new(b.maxConnsPerPeer, b.maxConnections, b.maxIn, b.maxOut)
ms = MultistreamSelect.new()
muxedUpgrade = MuxedUpgrade.new(identify, muxers, secureManagerInstances, connManager, ms)
@@ -209,7 +213,8 @@ proc newStandardSwitch*(
maxIn = -1,
maxOut = -1,
maxConnsPerPeer = MaxConnectionsPerPeer,
nameResolver: NameResolver = nil): Switch
nameResolver: NameResolver = nil,
sendSignedPeerRecord = false): Switch
{.raises: [Defect, LPError].} =
if SecureProtocol.Secio in secureManagers:
quit("Secio is deprecated!") # use of secio is unsafe
@@ -219,6 +224,7 @@ proc newStandardSwitch*(
.new()
.withAddresses(addrs)
.withRng(rng)
.withSignedPeerRecord(sendSignedPeerRecord)
.withMaxConnections(maxConnections)
.withMaxIn(maxIn)
.withMaxOut(maxOut)

View File

@@ -452,7 +452,8 @@ proc trackIncomingConn*(c: ConnManager,
raise exc
proc trackOutgoingConn*(c: ConnManager,
provider: ConnProvider):
provider: ConnProvider,
forceDial = false):
Future[Connection] {.async.} =
## try acquiring a connection if all slots
## are already taken, raise TooManyConnectionsError
@@ -462,7 +463,9 @@ proc trackOutgoingConn*(c: ConnManager,
trace "Tracking outgoing connection", count = c.outSema.count,
max = c.outSema.size
if not c.outSema.tryAcquire():
if forceDial:
c.outSema.forceAcquire()
elif not c.outSema.tryAcquire():
trace "Too many outgoing connections!", count = c.outSema.count,
max = c.outSema.size
raise newTooManyConnectionsError()

View File

@@ -19,7 +19,8 @@ type
method connect*(
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress]) {.async, base.} =
addrs: seq[MultiAddress],
forceDial = false) {.async, base.} =
## connect remote peer without negotiating
## a protocol
##
@@ -29,7 +30,8 @@ method connect*(
method dial*(
self: Dial,
peerId: PeerId,
protos: seq[string]): Future[Connection] {.async, base.} =
protos: seq[string],
): Future[Connection] {.async, base.} =
## create a protocol stream over an
## existing connection
##
@@ -40,7 +42,8 @@ method dial*(
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string]): Future[Connection] {.async, base.} =
protos: seq[string],
forceDial = false): Future[Connection] {.async, base.} =
## create a protocol stream and establish
## a connection if one doesn't exist already
##

View File

@@ -47,7 +47,8 @@ type
proc dialAndUpgrade(
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress]):
addrs: seq[MultiAddress],
forceDial: bool):
Future[Connection] {.async.} =
debug "Dialing peer", peerId
@@ -72,7 +73,8 @@ proc dialAndUpgrade(
transportCopy = transport
addressCopy = a
await self.connManager.trackOutgoingConn(
() => transportCopy.dial(hostname, addressCopy)
() => transportCopy.dial(hostname, addressCopy),
forceDial
)
except TooManyConnectionsError as exc:
trace "Connection limit reached!"
@@ -112,7 +114,8 @@ proc dialAndUpgrade(
proc internalConnect(
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress]):
addrs: seq[MultiAddress],
forceDial: bool):
Future[Connection] {.async.} =
if self.localPeerId == peerId:
raise newException(CatchableError, "can't dial self!")
@@ -136,7 +139,7 @@ proc internalConnect(
trace "Reusing existing connection", conn, direction = $conn.dir
return conn
conn = await self.dialAndUpgrade(peerId, addrs)
conn = await self.dialAndUpgrade(peerId, addrs, forceDial)
if isNil(conn): # None of the addresses connected
raise newException(DialFailedError, "Unable to establish outgoing link")
@@ -159,7 +162,8 @@ proc internalConnect(
method connect*(
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress]) {.async.} =
addrs: seq[MultiAddress],
forceDial = false) {.async.} =
## connect remote peer without negotiating
## a protocol
##
@@ -167,7 +171,7 @@ method connect*(
if self.connManager.connCount(peerId) > 0:
return
discard await self.internalConnect(peerId, addrs)
discard await self.internalConnect(peerId, addrs, forceDial)
proc negotiateStream(
self: Dialer,
@@ -200,7 +204,8 @@ method dial*(
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string]): Future[Connection] {.async.} =
protos: seq[string],
forceDial = false): Future[Connection] {.async.} =
## create a protocol stream and establish
## a connection if one doesn't exist already
##
@@ -218,7 +223,7 @@ method dial*(
try:
trace "Dialing (new)", peerId, protos
conn = await self.internalConnect(peerId, addrs)
conn = await self.internalConnect(peerId, addrs, forceDial)
trace "Opening stream", conn
stream = await self.connManager.getStream(conn)

View File

@@ -11,9 +11,9 @@
import std/[options, sequtils, hashes]
import pkg/[chronos, chronicles, stew/results]
import peerid, multiaddress, crypto/crypto, errors
import peerid, multiaddress, crypto/crypto, routing_record, errors
export peerid, multiaddress, crypto, errors, results
export peerid, multiaddress, crypto, routing_record, errors, results
## Our local peer info
@@ -28,6 +28,7 @@ type
agentVersion*: string
privateKey*: PrivateKey
publicKey*: PublicKey
signedPeerRecord*: Option[Envelope]
func shortLog*(p: PeerInfo): auto =
(
@@ -52,14 +53,26 @@ proc new*(
key.getPublicKey().tryGet()
except CatchableError:
raise newException(PeerInfoError, "invalid private key")
let peerId = PeerID.init(key).tryGet()
let sprRes = SignedPeerRecord.init(
key,
PeerRecord.init(peerId, @addrs)
)
let spr = if sprRes.isOk:
some(sprRes.get().envelope)
else:
none(Envelope)
let peerInfo = PeerInfo(
peerId: PeerId.init(key).tryGet(),
peerId: peerId,
publicKey: pubkey,
privateKey: key,
protoVersion: protoVersion,
agentVersion: agentVersion,
addrs: @addrs,
protocols: @protocols)
protocols: @protocols,
signedPeerRecord: spr)
return peerInfo

View File

@@ -14,6 +14,7 @@ import
./crypto/crypto,
./protocols/identify,
./peerid, ./peerinfo,
./routing_record,
./multiaddress
type
@@ -53,6 +54,8 @@ type
agentBook*: PeerBook[string]
protoVersionBook*: PeerBook[string]
signedPeerRecordBook*: PeerBook[Envelope]
## Constructs a new PeerStore with metadata of type M
proc new*(T: type PeerStore): PeerStore =
@@ -160,3 +163,6 @@ proc updatePeerInfo*(
if info.protos.len > 0:
peerStore.protoBook.set(info.peerId, info.protos)
if info.signedPeerRecord.isSome:
peerStore.signedPeerRecordBook.set(info.peerId, info.signedPeerRecord.get())

View File

@@ -9,7 +9,7 @@
{.push raises: [Defect].}
import options
import std/[sequtils, options, strutils, sugar]
import chronos, chronicles
import ../protobuf/minprotobuf,
../peerinfo,
@@ -44,9 +44,11 @@ type
protoVersion*: Option[string]
agentVersion*: Option[string]
protos*: seq[string]
signedPeerRecord*: Option[Envelope]
Identify* = ref object of LPProtocol
peerInfo*: PeerInfo
sendSignedPeerRecord*: bool
IdentifyPushHandler* = proc (
peer: PeerId,
@@ -57,8 +59,23 @@ type
IdentifyPush* = ref object of LPProtocol
identifyHandler: IdentifyPushHandler
proc encodeMsg*(peerInfo: PeerInfo, observedAddr: MultiAddress): ProtoBuffer
{.raises: [Defect, IdentifyNoPubKeyError].} =
chronicles.expandIt(IdentifyInfo):
pubkey = ($it.pubkey).shortLog
addresses = it.addrs.map(x => $x).join(",")
protocols = it.protos.map(x => $x).join(",")
observable_address =
if it.observedAddr.isSome(): $it.observedAddr.get()
else: "None"
proto_version = it.protoVersion.get("None")
agent_version = it.agentVersion.get("None")
signedPeerRecord =
# The SPR contains the same data as the identify message
# would be cumbersome to log
if iinfo.signedPeerRecord.isSome(): "Some"
else: "None"
proc encodeMsg(peerInfo: PeerInfo, observedAddr: MultiAddress, sendSpr: bool): ProtoBuffer
{.raises: [Defect].} =
result = initProtoBuffer()
let pkey = peerInfo.publicKey
@@ -76,6 +93,14 @@ proc encodeMsg*(peerInfo: PeerInfo, observedAddr: MultiAddress): ProtoBuffer
else:
peerInfo.agentVersion
result.write(6, agentVersion)
## Optionally populate signedPeerRecord field.
## See https://github.com/libp2p/go-libp2p/blob/ddf96ce1cfa9e19564feb9bd3e8269958bbc0aba/p2p/protocol/identify/pb/identify.proto for reference.
if peerInfo.signedPeerRecord.isSome() and sendSpr:
let sprBuff = peerInfo.signedPeerRecord.get().encode()
if sprBuff.isOk():
result.write(8, sprBuff.get())
result.finish()
proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] =
@@ -85,6 +110,7 @@ proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] =
oaddr: MultiAddress
protoVersion: string
agentVersion: string
signedPeerRecord: SignedPeerRecord
var pb = initProtoBuffer(buf)
@@ -95,8 +121,11 @@ proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] =
let r5 = pb.getField(5, protoVersion)
let r6 = pb.getField(6, agentVersion)
let r8 = pb.getField(8, signedPeerRecord)
let res = r1.isOk() and r2.isOk() and r3.isOk() and
r4.isOk() and r5.isOk() and r6.isOk()
r4.isOk() and r5.isOk() and r6.isOk() and
r8.isOk()
if res:
if r1.get():
@@ -107,18 +136,24 @@ proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] =
iinfo.protoVersion = some(protoVersion)
if r6.get():
iinfo.agentVersion = some(agentVersion)
debug "decodeMsg: decoded message", pubkey = ($pubkey).shortLog,
addresses = $iinfo.addrs, protocols = $iinfo.protos,
observable_address = $iinfo.observedAddr,
proto_version = $iinfo.protoVersion,
agent_version = $iinfo.agentVersion
if r8.get() and r1.get():
if iinfo.pubkey.get() == signedPeerRecord.envelope.publicKey:
iinfo.signedPeerRecord = some(signedPeerRecord.envelope)
debug "decodeMsg: decoded identify", iinfo
some(iinfo)
else:
trace "decodeMsg: failed to decode received message"
none[IdentifyInfo]()
proc new*(T: typedesc[Identify], peerInfo: PeerInfo): T =
let identify = T(peerInfo: peerInfo)
proc new*(
T: typedesc[Identify],
peerInfo: PeerInfo,
sendSignedPeerRecord = false
): T =
let identify = T(
peerInfo: peerInfo,
sendSignedPeerRecord: sendSignedPeerRecord
)
identify.init()
identify
@@ -126,7 +161,7 @@ method init*(p: Identify) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
try:
trace "handling identify request", conn
var pb = encodeMsg(p.peerInfo, conn.observedAddr)
var pb = encodeMsg(p.peerInfo, conn.observedAddr, p.sendSignedPeerRecord)
await conn.writeLp(pb.buffer)
except CancelledError as exc:
raise exc
@@ -206,5 +241,5 @@ proc init*(p: IdentifyPush) =
p.codec = IdentifyPushCodec
proc push*(p: IdentifyPush, peerInfo: PeerInfo, conn: Connection) {.async.} =
var pb = encodeMsg(peerInfo, conn.observedAddr)
var pb = encodeMsg(peerInfo, conn.observedAddr, true)
await conn.writeLp(pb.buffer)

View File

@@ -0,0 +1,6 @@
# this module will be further extended in PR
# https://github.com/status-im/nim-libp2p/pull/107/
type
ValidationResult* {.pure.} = enum
Accept, Reject, Ignore

View File

@@ -96,7 +96,14 @@ method rpcHandler*(f: FloodSub,
f.handleSubscribe(peer, sub.topic, sub.subscribe)
for msg in rpcMsg.messages: # for every message
let msgId = f.msgIdProvider(msg)
let msgIdResult = f.msgIdProvider(msg)
if msgIdResult.isErr:
debug "Dropping message due to failed message id generation",
error = msgIdResult.error
# TODO: descore peers due to error during message validation (malicious?)
continue
let msgId = msgIdResult.get
if f.addSeen(msgId):
trace "Dropping already-seen message", msgId, peer
@@ -184,7 +191,14 @@ method publish*(f: FloodSub,
Message.init(none(PeerInfo), data, topic, none(uint64), false)
else:
Message.init(some(f.peerInfo), data, topic, some(f.msgSeqno), f.sign)
msgId = f.msgIdProvider(msg)
msgIdResult = f.msgIdProvider(msg)
if msgIdResult.isErr:
trace "Error generating message id, skipping publish",
error = msgIdResult.error
return 0
let msgId = msgIdResult.get
trace "Created new message",
msg = shortLog(msg), peers = peers.len, topic, msgId

View File

@@ -186,16 +186,16 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
if s[].len == 0:
g.peersInIP.del(pubSubPeer.address.get())
for t in toSeq(g.gossipsub.keys):
g.gossipsub.removePeer(t, pubSubPeer)
# also try to remove from explicit table here
g.explicit.removePeer(t, pubSubPeer)
for t in toSeq(g.mesh.keys):
trace "pruning unsubscribing peer", pubSubPeer, score = pubSubPeer.score
g.pruned(pubSubPeer, t)
g.mesh.removePeer(t, pubSubPeer)
for t in toSeq(g.gossipsub.keys):
g.gossipsub.removePeer(t, pubSubPeer)
# also try to remove from explicit table here
g.explicit.removePeer(t, pubSubPeer)
for t in toSeq(g.fanout.keys):
g.fanout.removePeer(t, pubSubPeer)
@@ -237,9 +237,14 @@ proc handleSubscribe*(g: GossipSub,
else:
trace "peer unsubscribed from topic"
if g.mesh.hasPeer(topic, peer):
#against spec
g.mesh.removePeer(topic, peer)
g.pruned(peer, topic)
# unsubscribe remote peer from the topic
g.gossipsub.removePeer(topic, peer)
g.mesh.removePeer(topic, peer)
g.fanout.removePeer(topic, peer)
if peer.peerId in g.parameters.directPeers:
g.explicit.removePeer(topic, peer)
@@ -357,8 +362,16 @@ method rpcHandler*(g: GossipSub,
for i in 0..<rpcMsg.messages.len(): # for every message
template msg: untyped = rpcMsg.messages[i]
let msgIdResult = g.msgIdProvider(msg)
if msgIdResult.isErr:
debug "Dropping message due to failed message id generation",
error = msgIdResult.error
# TODO: descore peers due to error during message validation (malicious?)
continue
let
msgId = g.msgIdProvider(msg)
msgId = msgIdResult.get
msgIdSalted = msgId & g.seenSalt
# addSeen adds salt to msgId to avoid
@@ -469,13 +482,23 @@ method publish*(g: GossipSub,
if topic in g.topics: # if we're subscribed use the mesh
peers.incl(g.mesh.getOrDefault(topic))
else: # not subscribed, send to fanout peers
# try optimistically
peers.incl(g.fanout.getOrDefault(topic))
if peers.len == 0:
# ok we had nothing.. let's try replenish inline
if peers.len < g.parameters.dLow and g.parameters.floodPublish == false:
# not subscribed or bad mesh, send to fanout peers
# disable for floodPublish, since we already sent to every good peer
#
var fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
if fanoutPeers.len == 0:
g.replenishFanout(topic)
peers.incl(g.fanout.getOrDefault(topic))
fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
g.rng.shuffle(fanoutPeers)
if fanoutPeers.len + peers.len > g.parameters.d:
fanoutPeers.setLen(g.parameters.d - peers.len)
for fanPeer in fanoutPeers:
peers.incl(fanPeer)
if peers.len > g.parameters.d: break
# even if we couldn't publish,
# we still attempted to publish
@@ -500,7 +523,15 @@ method publish*(g: GossipSub,
Message.init(none(PeerInfo), data, topic, none(uint64), false)
else:
Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign)
msgId = g.msgIdProvider(msg)
msgIdResult = g.msgIdProvider(msg)
if msgIdResult.isErr:
trace "Error generating message id, skipping publish",
error = msgIdResult.error
libp2p_gossipsub_failed_publish.inc()
return 0
let msgId = msgIdResult.get
logScope: msgId = shortLog(msgId)

View File

@@ -14,7 +14,7 @@ import chronos, chronicles, metrics
import "."/[types, scoring]
import ".."/[pubsubpeer, peertable, timedcache, mcache, floodsub, pubsub]
import "../rpc"/[messages]
import "../../.."/[peerid, multiaddress, utility, switch]
import "../../.."/[peerid, multiaddress, utility, switch, routing_record, signed_envelope]
declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache")
declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"])
@@ -83,8 +83,16 @@ proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises:
x.score >= 0.0
# by spec, larger then Dhi, but let's put some hard caps
peers.setLen(min(peers.len, g.parameters.dHigh * 2))
let sprBook = g.switch.peerStore.signedPeerRecordBook
peers.map do (x: PubSubPeer) -> PeerInfoMsg:
PeerInfoMsg(peerId: x.peerId.getBytes())
PeerInfoMsg(
peerId: x.peerId,
signedPeerRecord:
if x.peerId in sprBook:
sprBook.get(x.peerId).encode().get(default(seq[byte]))
else:
default(seq[byte])
)
proc handleGraft*(g: GossipSub,
peer: PubSubPeer,
@@ -165,6 +173,29 @@ proc handleGraft*(g: GossipSub,
return prunes
proc getPeers(prune: ControlPrune, peer: PubSubPeer): seq[(PeerId, Option[PeerRecord])] =
var routingRecords: seq[(PeerId, Option[PeerRecord])]
for record in prune.peers:
let peerRecord =
if record.signedPeerRecord.len == 0:
none(PeerRecord)
else:
let signedRecord = SignedPeerRecord.decode(record.signedPeerRecord)
if signedRecord.isErr:
trace "peer sent invalid SPR", peer, error=signedRecord.error
none(PeerRecord)
else:
if record.peerID != signedRecord.get().data.peerId:
trace "peer sent envelope with wrong public key", peer
none(PeerRecord)
else:
some(signedRecord.get().data)
routingRecords.add((record.peerId, peerRecord))
routingRecords
proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [Defect].} =
for prune in prunes:
let topic = prune.topicID
@@ -190,9 +221,12 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.r
g.pruned(peer, topic, setBackoff = false)
g.mesh.removePeer(topic, peer)
# TODO peer exchange, we miss ambient peer discovery in libp2p, so we are blocked by that
# another option could be to implement signed peer records
## if peer.score > g.parameters.gossipThreshold and prunes.peers.len > 0:
if peer.score > g.parameters.gossipThreshold and prune.peers.len > 0 and
g.routingRecordsHandler.len > 0:
let routingRecords = prune.getPeers(peer)
for handler in g.routingRecordsHandler:
handler(peer.peerId, topic, routingRecords)
proc handleIHave*(g: GossipSub,
peer: PubSubPeer,
@@ -489,9 +523,11 @@ proc replenishFanout*(g: GossipSub, topic: string) {.raises: [Defect].} =
logScope: topic
trace "about to replenish fanout"
let currentMesh = g.mesh.getOrDefault(topic)
if g.fanout.peers(topic) < g.parameters.dLow:
trace "replenishing fanout", peers = g.fanout.peers(topic)
for peer in g.gossipsub.getOrDefault(topic):
if peer in currentMesh: continue
if g.fanout.addPeer(topic, peer):
if g.fanout.peers(topic) == g.parameters.d:
break

View File

@@ -142,6 +142,13 @@ type
BackoffTable* = Table[string, Table[PeerId, Moment]]
ValidationSeenTable* = Table[MessageID, HashSet[PubSubPeer]]
RoutingRecordsPair* = tuple[id: PeerId, record: Option[PeerRecord]]
RoutingRecordsHandler* =
proc(peer: PeerId,
tag: string, # For gossipsub, the topic
peers: seq[RoutingRecordsPair])
{.gcsafe, raises: [Defect].}
GossipSub* = ref object of FloodSub
mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic
fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic
@@ -153,7 +160,7 @@ type
control*: Table[string, ControlMessage] # pending control messages
mcache*: MCache # messages cache
validationSeen*: ValidationSeenTable # peers who sent us message in validation
heartbeatFut*: Future[void] # cancellation future for heartbeat interval
heartbeatFut*: Future[void] # cancellation future for heartbeat interval
heartbeatRunning*: bool
peerStats*: Table[PeerId, PeerStats]
@@ -161,6 +168,7 @@ type
topicParams*: Table[string, TopicParams]
directPeersLoop*: Future[void]
peersInIP*: Table[MultiAddress, HashSet[PeerId]]
routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange
heartbeatEvents*: seq[AsyncEvent]

View File

@@ -11,7 +11,8 @@
import std/[tables, sequtils, sets, strutils]
import chronos, chronicles, metrics, bearssl
import ./pubsubpeer,
import ./errors as pubsub_errors,
./pubsubpeer,
./rpc/[message, messages, protobuf],
../../switch,
../protocol,
@@ -29,6 +30,7 @@ export results
export PubSubPeer
export PubSubObserver
export protocol
export pubsub_errors
logScope:
topics = "libp2p pubsub"
@@ -76,16 +78,13 @@ type
TopicHandler* = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].}
ValidationResult* {.pure.} = enum
Accept, Reject, Ignore
ValidatorHandler* = proc(topic: string,
message: Message): Future[ValidationResult] {.gcsafe, raises: [Defect].}
TopicPair* = tuple[topic: string, handler: TopicHandler]
MsgIdProvider* =
proc(m: Message): MessageID {.noSideEffect, raises: [Defect], gcsafe.}
proc(m: Message): Result[MessageID, ValidationResult] {.noSideEffect, raises: [Defect], gcsafe.}
SubscriptionValidator* =
proc(topic: string): bool {.raises: [Defect], gcsafe.}
@@ -452,6 +451,11 @@ proc subscribe*(p: PubSub,
## on every received message
##
# Check that this is an allowed topic
if p.subscriptionValidator != nil and p.subscriptionValidator(topic) == false:
warn "Trying to subscribe to a topic not passing validation!", topic
return
p.topics.withValue(topic, handlers) do:
# Already subscribed, just adding another handler
handlers[].add(handler)

View File

@@ -16,9 +16,10 @@ import ./messages,
../../../peerid,
../../../peerinfo,
../../../crypto/crypto,
../../../protobuf/minprotobuf
../../../protobuf/minprotobuf,
../../../protocols/pubsub/errors
export messages
export errors, messages
logScope:
topics = "pubsubmessage"
@@ -28,16 +29,12 @@ const PubSubPrefix = toBytes("libp2p-pubsub:")
declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated messages")
declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages")
func defaultMsgIdProvider*(m: Message): MessageID =
let mid =
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
byteutils.toHex(m.seqno) & $m.fromPeer
else:
# This part is irrelevant because it's not standard,
# We use it exclusively for testing basically and users should
# implement their own logic in the case they use anonymization
$m.data.hash & $m.topicIDs.hash
mid.toBytes()
func defaultMsgIdProvider*(m: Message): Result[MessageID, ValidationResult] =
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
let mid = byteutils.toHex(m.seqno) & $m.fromPeer
ok mid.toBytes()
else:
err ValidationResult.Reject
proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] =
ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg, false))).getBytes())

View File

@@ -10,14 +10,17 @@
{.push raises: [Defect].}
import options, sequtils
import ../../../utility
import ../../../peerid
import "../../.."/[
peerid,
routing_record,
utility
]
export options
type
PeerInfoMsg* = object
peerId*: seq[byte]
peerId*: PeerId
signedPeerRecord*: seq[byte]
SubOpts* = object

View File

@@ -11,7 +11,7 @@
{.push raises: [Defect].}
import std/[sequtils, times]
import std/[sequtils, times, sugar]
import pkg/stew/[results, byteutils]
import
multiaddress,
@@ -22,11 +22,6 @@ import
export peerid, multiaddress, signed_envelope
## Constants relating to signed peer records
const
EnvelopeDomain = multiCodec("libp2p-peer-record") # envelope domain as per RFC0002
EnvelopePayloadType= @[(byte) 0x03, (byte) 0x01] # payload_type for routing records as spec'ed in RFC0003
type
AddressInfo* = object
address*: MultiAddress
@@ -76,8 +71,9 @@ proc encode*(record: PeerRecord): seq[byte] =
proc init*(T: typedesc[PeerRecord],
peerId: PeerId,
seqNo: uint64,
addresses: seq[MultiAddress]): T =
addresses: seq[MultiAddress],
seqNo = getTime().toUnix().uint64 # follows the recommended implementation, using unix epoch as seq no.
): T =
PeerRecord(
peerId: peerId,
@@ -87,39 +83,13 @@ proc init*(T: typedesc[PeerRecord],
## Functions related to signed peer records
type SignedPeerRecord* = SignedPayload[PeerRecord]
proc init*(T: typedesc[Envelope],
privateKey: PrivateKey,
peerRecord: PeerRecord): Result[Envelope, CryptoError] =
## Init a signed envelope wrapping a peer record
proc payloadDomain*(T: typedesc[PeerRecord]): string = $multiCodec("libp2p-peer-record")
proc payloadType*(T: typedesc[PeerRecord]): seq[byte] = @[(byte) 0x03, (byte) 0x01]
let envelope = ? Envelope.init(privateKey,
EnvelopePayloadType,
peerRecord.encode(),
$EnvelopeDomain)
ok(envelope)
proc init*(T: typedesc[Envelope],
peerId: PeerId,
addresses: seq[MultiAddress],
privateKey: PrivateKey): Result[Envelope, CryptoError] =
## Creates a signed peer record for this peer:
## a peer routing record according to https://github.com/libp2p/specs/blob/500a7906dd7dd8f64e0af38de010ef7551fd61b6/RFC/0003-routing-records.md
## in a signed envelope according to https://github.com/libp2p/specs/blob/500a7906dd7dd8f64e0af38de010ef7551fd61b6/RFC/0002-signed-envelopes.md
# First create a peer record from the peer info
let peerRecord = PeerRecord.init(peerId,
getTime().toUnix().uint64, # This currently follows the recommended implementation, using unix epoch as seq no.
addresses)
let envelope = ? Envelope.init(privateKey,
peerRecord)
ok(envelope)
proc getSignedPeerRecord*(pb: ProtoBuffer, field: int,
value: var Envelope): ProtoResult[bool] {.
inline.} =
getField(pb, field, value, $EnvelopeDomain)
proc checkValid*(spr: SignedPeerRecord): Result[void, EnvelopeError] =
if not spr.data.peerId.match(spr.envelope.publicKey):
err(EnvelopeInvalidSignature)
else:
ok()

View File

@@ -11,6 +11,7 @@
{.push raises: [Defect].}
import std/sugar
import pkg/stew/[results, byteutils]
import multicodec,
crypto/crypto,
@@ -23,7 +24,8 @@ type
EnvelopeError* = enum
EnvelopeInvalidProtobuf,
EnvelopeFieldMissing,
EnvelopeInvalidSignature
EnvelopeInvalidSignature,
EnvelopeWrongType
Envelope* = object
publicKey*: PublicKey
@@ -116,3 +118,52 @@ proc getField*(pb: ProtoBuffer, field: int,
ok(true)
else:
err(ProtoError.IncorrectBlob)
type
SignedPayload*[T] = object
# T needs to have .encode(), .decode(), .payloadType(), .domain()
envelope*: Envelope
data*: T
proc init*[T](_: typedesc[SignedPayload[T]],
privateKey: PrivateKey,
data: T): Result[SignedPayload[T], CryptoError] =
mixin encode
let envelope = ? Envelope.init(privateKey,
T.payloadType(),
data.encode(),
T.payloadDomain)
ok(SignedPayload[T](data: data, envelope: envelope))
proc getField*[T](pb: ProtoBuffer, field: int,
value: var SignedPayload[T]): ProtoResult[bool] {.
inline.} =
if not ? getField(pb, field, value.envelope, T.payloadDomain):
ok(false)
else:
mixin decode
value.data = ? T.decode(value.envelope.payload).mapErr(x => ProtoError.IncorrectBlob)
ok(true)
proc decode*[T](
_: typedesc[SignedPayload[T]],
buffer: seq[byte]
): Result[SignedPayload[T], EnvelopeError] =
let
envelope = ? Envelope.decode(buffer, T.payloadDomain)
data = ? T.decode(envelope.payload).mapErr(x => EnvelopeInvalidProtobuf)
signedPayload = SignedPayload[T](envelope: envelope, data: data)
if envelope.payloadType != T.payloadType:
return err(EnvelopeWrongType)
when compiles(? signedPayload.checkValid()):
? signedPayload.checkValid()
ok(signedPayload)
proc encode*[T](msg: SignedPayload[T]): Result[seq[byte], CryptoError] =
msg.envelope.encode()

View File

@@ -0,0 +1,225 @@
## Nim-LibP2P
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
# Could be in chronos
import std/[strformat, sequtils]
import stew/byteutils
import chronos, chronicles
## Sync part
##
## Ring buffer where writep == readp means empty
## and writep + & == readp means full.
## This wastes a slot of the buf that will never be used
##
## Instead of doing clever modulos, ifs are used for clarity
type
RingBuffer*[T] = object
buf: seq[T] # Data store
readp: int # Reading position
writep: int # Writing position
full: bool
func isFull*[T](rb: RingBuffer[T]): bool =
rb.full
func isEmpty*[T](rb: RingBuffer[T]): bool =
rb.writep == rb.readp and not rb.full
func len*[T](rb: RingBuffer[T]): int =
if rb.full: rb.buf.len
elif rb.writep < rb.readp: rb.writep + rb.buf.len - rb.readp
else: rb.writep - rb.readp
proc write*[T](rb: var RingBuffer[T], data: openArray[T]): int =
## Add data
let
readPosRelative =
if rb.readp < rb.writep: rb.readp + rb.buf.len
else: rb.readp
canAddBeforeFull =
if rb.isEmpty: min(rb.buf.len, data.len)
else: min(readPosRelative - rb.writep, data.len)
canAddBeforeSplit = min(rb.buf.len - rb.writep, data.len)
canAdd = min(canAddBeforeFull, canAddBeforeSplit)
copyMem(addr rb.buf[rb.writep], unsafeAddr data[0], canAdd)
rb.writep += canAdd
if canAdd < canAddBeforeFull:
let splittedCount = canAddBeforeFull - canAddBeforeSplit
copyMem(addr rb.buf[0], unsafeAddr data[canAdd], splittedCount)
rb.writep = splittedCount
if rb.writeP == rb.buf.len: rb.writeP = 0
if rb.writeP == rb.readP: rb.full = true
canAddBeforeFull
proc read_internal[T](rb: var RingBuffer[T], buffer: var openArray[T]): int =
## This will chunk at the end of the circular buffer
## to avoid allocations
let
continuousInBuffer =
if rb.writep < rb.readp or rb.full: rb.buf.len - rb.readp
else: rb.writep - rb.readp
canRead = min(buffer.len, continuousInBuffer)
copyMem(unsafeAddr buffer[0], addr rb.buf[rb.readp], canRead)
rb.readp += canRead
if rb.readp == rb.buf.len: rb.readp = 0
rb.full = false
canRead
proc read*[T](rb: var RingBuffer[T], buffer: var openArray[T]): int =
rb.read_internal(buffer)
proc readAll*[T](rb: var RingBuffer[T]): seq[T] =
result = newSeq[T](rb.len())
let readFirst = rb.read_internal(result)
if readFirst < result.len:
discard rb.read_internal(result.toOpenArray(readFirst, result.len - 1))
proc init*[T](R: typedesc[RingBuffer[T]], size: int): R =
RingBuffer[T](
buf: newSeq[T](size)
)
when isMainModule:
var totalI = 0
proc getSeq(i: int): seq[byte] =
for x in 0..<i: result.add((totalI + x).byte)
totalI.inc(i)
#var rb = RingBuffer[byte].init(12)
#var buffer = newSeq[byte](30)
#echo "Sync"
#echo rb.write(getSeq(3))
#echo rb.write(getSeq(6))
#echo rb.readAll()
#echo rb.write(getSeq(6))
#echo rb
#echo rb.write(getSeq(6))
#echo rb.readAll()
#echo rb.write(getSeq(30))
#echo rb
## Async part
type
AsyncRingBuffer*[T] = ref object
ring: RingBuffer[T]
#TODO handle a single getter
getters: seq[Future[void]]
putters: seq[Future[void]]
# Borrows
func isFull*[T](rb: AsyncRingBuffer[T]): bool = rb.ring.isFull()
func isEmpty*[T](rb: AsyncRingBuffer[T]): bool = rb.ring.isEmpty()
func len*[T](rb: AsyncRingBuffer[T]): int = rb.ring.len()
# Stolen from chronos
proc wakeupNext(waiters: var seq[Future[void]]) {.inline.} =
var i = 0
while i < len(waiters):
var waiter = waiters[i]
inc(i)
if not(waiter.finished()):
waiter.complete()
break
if i > 0:
waiters.delete(0, i - 1)
#proc write*[T](rb: AsyncRingBuffer[T], data: seq[T]) {.async.} =
# # First, get a slot
# while rb.isFull():
# var putter = newFuture[void]("RingBuffer.write")
# rb.putters.add(putter)
# await putter
#
# # Now we must write everything without getting our slot stollen
# var written = rb.ring.write(data)
# rb.getters.wakeUpNext()
#
# while written < data.len:
# var putter = newFuture[void]("RingBuffer.write")
# # We are prioritary
# rb.putters.insert(putter, 0)
# await putter
#
# written += rb.ring.write(data.toOpenArray(written, data.len - 1))
# rb.getters.wakeUpNext()
#
# if not rb.isFull(): #Room for the next one
# rb.putters.wakeUpNext()
proc write*[T](rb: AsyncRingBuffer[T], data: ptr UncheckedArray[T], size: int) {.async.} =
# First, get a slot
while rb.isFull():
var putter = newFuture[void]("RingBuffer.write")
rb.putters.add(putter)
await putter
# Now we must write everything without getting our slot stollen
var written = rb.ring.write(toOpenArray(data, 0, size))
rb.getters.wakeUpNext()
while written < size:
var putter = newFuture[void]("RingBuffer.write")
# We are prioritary
rb.putters.insert(putter, 0)
await putter
written += rb.ring.write(data.toOpenArray(written, size - 1))
rb.getters.wakeUpNext()
if not rb.isFull(): #Room for the next one
rb.putters.wakeUpNext()
proc read*[T](rb: AsyncRingBuffer[T], maxRead: int = 10000): Future[seq[T]] {.async.} =
while rb.isEmpty():
var getter = newFuture[void]("RingBuffer.read")
rb.getters.add(getter)
await getter
let res = rb.ring.readAll()
rb.putters.wakeUpNext()
# Since we read everything, we won't wake up other readers
return res
proc new*[T](R: typedesc[AsyncRingBuffer[T]], size: int): R =
AsyncRingBuffer[T](
ring: RingBuffer[T].init(size)
)
when isMainModule:
proc testA {.async.} =
var rb = AsyncRingBuffer[byte].new(3)
#await rb.write(getSeq(6))
let toSend = getSeq(30)
let tooBigWrite = rb.write(cast[ptr UncheckedArray[byte]](unsafeAddr toSend[0]), 30)# and rb.write(UncheckedArray(getSeq(70)), 70)
while rb.len > 0 or not tooBigWrite.finished:
let reader = rb.read()
await reader or tooBigWrite
if reader.finished:
echo await reader
else:
await reader.cancelAndWait()
echo "Async"
waitFor(testA())

View File

@@ -99,8 +99,9 @@ proc disconnect*(s: Switch, peerId: PeerId): Future[void] {.gcsafe.} =
method connect*(
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress]): Future[void] =
s.dialer.connect(peerId, addrs)
addrs: seq[MultiAddress],
forceDial = false): Future[void] =
s.dialer.connect(peerId, addrs, forceDial)
method dial*(
s: Switch,
@@ -117,8 +118,9 @@ method dial*(
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string]): Future[Connection] =
s.dialer.dial(peerId, addrs, protos)
protos: seq[string],
forceDial = false): Future[Connection] =
s.dialer.dial(peerId, addrs, protos, forceDial)
proc dial*(
s: Switch,

View File

@@ -30,6 +30,8 @@ export transport, websock
const
WsTransportTrackerName* = "libp2p.wstransport"
DefaultHeadersTimeout = 3.seconds
type
WsStream = ref object of Connection
session: WSSession
@@ -49,11 +51,24 @@ proc new*(T: type WsStream,
stream.initStream()
return stream
template mapExceptions(body: untyped) =
try:
body
except AsyncStreamIncompleteError:
raise newLPStreamEOFError()
except AsyncStreamUseClosedError:
raise newLPStreamEOFError()
except WSClosedError:
raise newLPStreamEOFError()
except AsyncStreamLimitError:
raise newLPStreamLimitError()
method readOnce*(
s: WsStream,
pbytes: pointer,
nbytes: int): Future[int] {.async.} =
let res = await s.session.recv(pbytes, nbytes)
let res = mapExceptions(await s.session.recv(pbytes, nbytes))
if res == 0 and s.session.readyState == ReadyState.Closed:
raise newLPStreamEOFError()
return res
@@ -61,10 +76,7 @@ method readOnce*(
method write*(
s: WsStream,
msg: seq[byte]): Future[void] {.async.} =
try:
await s.session.send(msg, Opcode.Binary)
except WSClosedError:
raise newLPStreamEOFError()
mapExceptions(await s.session.send(msg, Opcode.Binary))
method closeImpl*(s: WsStream): Future[void] {.async.} =
await s.session.close()
@@ -82,6 +94,7 @@ type
tlsCertificate: TLSCertificate
tlsFlags: set[TLSFlags]
flags: set[ServerFlags]
handshakeTimeout: Duration
factories: seq[ExtFactory]
rng: Rng
@@ -121,9 +134,13 @@ method start*(
address = ma.initTAddress().tryGet(),
tlsPrivateKey = self.tlsPrivateKey,
tlsCertificate = self.tlsCertificate,
flags = self.flags)
flags = self.flags,
handshakeTimeout = self.handshakeTimeout)
else:
HttpServer.create(ma.initTAddress().tryGet())
HttpServer.create(
ma.initTAddress().tryGet(),
handshakeTimeout = self.handshakeTimeout
)
self.httpservers &= httpserver
@@ -212,19 +229,19 @@ method accept*(self: WsTransport): Future[Connection] {.async, gcsafe.} =
if not self.running:
raise newTransportClosedError()
if self.acceptFuts.len <= 0:
self.acceptFuts = self.httpservers.mapIt(it.accept())
if self.acceptFuts.len <= 0:
return
let
finished = await one(self.acceptFuts)
index = self.acceptFuts.find(finished)
self.acceptFuts[index] = self.httpservers[index].accept()
try:
if self.acceptFuts.len <= 0:
self.acceptFuts = self.httpservers.mapIt(it.accept())
if self.acceptFuts.len <= 0:
return
let
finished = await one(self.acceptFuts)
index = self.acceptFuts.find(finished)
self.acceptFuts[index] = self.httpservers[index].accept()
let req = await finished
try:
@@ -240,6 +257,8 @@ method accept*(self: WsTransport): Future[Connection] {.async, gcsafe.} =
debug "OS Error", exc = exc.msg
except WebSocketError as exc:
debug "Websocket Error", exc = exc.msg
except HttpError as exc:
debug "Http Error", exc = exc.msg
except AsyncStreamError as exc:
debug "AsyncStream Error", exc = exc.msg
except TransportTooManyError as exc:
@@ -291,7 +310,8 @@ proc new*(
tlsFlags: set[TLSFlags] = {},
flags: set[ServerFlags] = {},
factories: openArray[ExtFactory] = [],
rng: Rng = nil): T =
rng: Rng = nil,
handshakeTimeout = DefaultHeadersTimeout): T =
T(
upgrader: upgrade,
@@ -300,14 +320,16 @@ proc new*(
tlsFlags: tlsFlags,
flags: flags,
factories: @factories,
rng: rng)
rng: rng,
handshakeTimeout: handshakeTimeout)
proc new*(
T: typedesc[WsTransport],
upgrade: Upgrade,
flags: set[ServerFlags] = {},
factories: openArray[ExtFactory] = [],
rng: Rng = nil): T =
rng: Rng = nil,
handshakeTimeout = DefaultHeadersTimeout): T =
T.new(
upgrade = upgrade,
@@ -315,4 +337,5 @@ proc new*(
tlsCertificate = nil,
flags = flags,
factories = @factories,
rng = rng)
rng = rng,
handshakeTimeout = handshakeTimeout)

View File

@@ -54,16 +54,21 @@ proc acquire*(s: AsyncSemaphore): Future[void] =
fut.cancelCallback = nil
if not fut.finished:
s.queue.keepItIf( it != fut )
s.count.inc
fut.cancelCallback = cancellation
s.queue.add(fut)
s.count.dec
trace "Queued slot", available = s.count, queue = s.queue.len
return fut
proc forceAcquire*(s: AsyncSemaphore) =
## ForceAcquire will always succeed,
## creating a temporary slot if required.
## This temporary slot will stay usable until
## there is less `acquire`s than `release`s
s.count.dec
proc release*(s: AsyncSemaphore) =
## Release a resource from the semaphore,
## by picking the first future from the queue
@@ -77,13 +82,15 @@ proc release*(s: AsyncSemaphore) =
trace "Releasing slot", available = s.count,
queue = s.queue.len
if s.queue.len > 0:
s.count.inc
while s.queue.len > 0:
var fut = s.queue[0]
s.queue.delete(0)
if not fut.finished():
s.count.dec
fut.complete()
break
s.count.inc # increment the resource count
trace "Released slot", available = s.count,
queue = s.queue.len
return

View File

@@ -137,6 +137,10 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
await transport1.stop()
asyncTest "e2e should allow multiple local addresses":
when defined(windows):
# this randomly locks the Windows CI job
skip()
return
let addrs = @[MultiAddress.init(ma).tryGet(),
MultiAddress.init(ma).tryGet()]

View File

@@ -20,6 +20,7 @@ import utils,
protocols/pubsub/floodsub,
protocols/pubsub/rpc/messages,
protocols/pubsub/peertable]
import ../../libp2p/protocols/pubsub/errors as pubsub_errors
import ../helpers

View File

@@ -39,6 +39,8 @@ proc randomPeerId(): PeerId =
except CatchableError as exc:
raise newException(Defect, exc.msg)
const MsgIdSuccess = "msg id gen success"
suite "GossipSub internal":
teardown:
checkTrackers()
@@ -308,7 +310,7 @@ suite "GossipSub internal":
conn.peerId = peerId
inc seqno
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
check gossipSub.fanout[topic].len == 15
check gossipSub.mesh[topic].len == 15
@@ -355,7 +357,7 @@ suite "GossipSub internal":
conn.peerId = peerId
inc seqno
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
let peers = gossipSub.getGossipPeers()
check peers.len == gossipSub.parameters.d
@@ -396,7 +398,7 @@ suite "GossipSub internal":
conn.peerId = peerId
inc seqno
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
let peers = gossipSub.getGossipPeers()
check peers.len == gossipSub.parameters.d
@@ -437,7 +439,7 @@ suite "GossipSub internal":
conn.peerId = peerId
inc seqno
let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno))
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
let peers = gossipSub.getGossipPeers()
check peers.len == 0

View File

@@ -24,6 +24,7 @@ import utils, ../../libp2p/[errors,
protocols/pubsub/peertable,
protocols/pubsub/timedcache,
protocols/pubsub/rpc/messages]
import ../../libp2p/protocols/pubsub/errors as pubsub_errors
import ../helpers
proc `$`(peer: PubSubPeer): string = shortLog(peer)
@@ -564,6 +565,72 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
check observed == 2
asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic":
var passed = newFuture[void]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
passed.complete()
let
nodes = generateNodes(
2,
gossip = true,
unsubscribeBackoff = 10.minutes)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
nodes[1].subscribe("foobar", handler)
nodes[0].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
await waitSub(nodes[1], nodes[0], "foobar")
nodes[0].unsubscribe("foobar", handler)
let gsNode = GossipSub(nodes[1])
check await checkExpiring(gsNode.mesh.getOrDefault("foobar").len == 0)
nodes[0].subscribe("foobar", handler)
check GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
check:
GossipSub(nodes[0]).fanout.getOrDefault("foobar").len > 0
GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0
await passed.wait(2.seconds)
trace "test done, stopping..."
await nodes[0].stop()
await nodes[1].stop()
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - GossipSub send over mesh A -> B":
var passed: Future[bool] = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
@@ -865,3 +932,75 @@ suite "GossipSub":
it.switch.stop())))
await allFuturesThrowing(nodesFut)
asyncTest "e2e - GossipSub peer exchange":
# A, B & C are subscribed to something
# B unsubcribe from it, it should send
# PX to A & C
#
# C sent his SPR, not A
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard # not used in this test
let
nodes = generateNodes(
2,
gossip = true) &
generateNodes(1, gossip = true, sendSignedPeerRecord = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
nodes[2].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
nodes[2].start(),
))
var
gossip0 = GossipSub(nodes[0])
gossip1 = GossipSub(nodes[1])
gossip2 = GossipSub(nodes[1])
await subscribeNodes(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
nodes[2].subscribe("foobar", handler)
for x in 0..<3:
for y in 0..<3:
if x != y:
await waitSub(nodes[x], nodes[y], "foobar")
var passed: Future[void] = newFuture[void]()
gossip0.routingRecordsHandler.add(proc(peer: PeerId, tag: string, peers: seq[RoutingRecordsPair]) =
check:
tag == "foobar"
peers.len == 2
peers[0].record.isSome() xor peers[1].record.isSome()
passed.complete()
)
nodes[1].unsubscribe("foobar", handler)
await passed
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop(),
nodes[2].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop(),
nodes[2].stop()
)
await allFuturesThrowing(nodesFut.concat())

View File

@@ -147,6 +147,9 @@ suite "GossipSub":
nodes[1].start(),
))
# We must subscribe before setting the validator
nodes[0].subscribe("foobar", handler)
var gossip = GossipSub(nodes[0])
let invalidDetected = newFuture[void]()
gossip.subscriptionValidator =
@@ -162,7 +165,6 @@ suite "GossipSub":
await subscribeNodes(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
await invalidDetected.wait(10.seconds)

View File

@@ -5,19 +5,21 @@ import stew/byteutils
import ../../libp2p/[peerid,
crypto/crypto,
protocols/pubsub/mcache,
protocols/pubsub/rpc/message,
protocols/pubsub/rpc/messages]
import ./utils
var rng = newRng()
proc randomPeerId(): PeerId =
PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).get()
const MsgIdGenSuccess = "msg id generation success"
suite "MCache":
test "put/get":
var mCache = MCache.init(3, 5)
var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes())
let msgId = defaultMsgIdProvider(msg)
let msgId = defaultMsgIdProvider(msg).expect(MsgIdGenSuccess)
mCache.put(msgId, msg)
check mCache.get(msgId).isSome and mCache.get(msgId).get() == msg
@@ -28,13 +30,13 @@ suite "MCache":
var msg = Message(fromPeer: randomPeerId(),
seqno: "12345".toBytes(),
topicIDs: @["foo"])
mCache.put(defaultMsgIdProvider(msg), msg)
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
for i in 0..<5:
var msg = Message(fromPeer: randomPeerId(),
seqno: "12345".toBytes(),
topicIDs: @["bar"])
mCache.put(defaultMsgIdProvider(msg), msg)
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
var mids = mCache.window("foo")
check mids.len == 3
@@ -49,7 +51,7 @@ suite "MCache":
var msg = Message(fromPeer: randomPeerId(),
seqno: "12345".toBytes(),
topicIDs: @["foo"])
mCache.put(defaultMsgIdProvider(msg), msg)
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
mCache.shift()
check mCache.window("foo").len == 0
@@ -58,7 +60,7 @@ suite "MCache":
var msg = Message(fromPeer: randomPeerId(),
seqno: "12345".toBytes(),
topicIDs: @["bar"])
mCache.put(defaultMsgIdProvider(msg), msg)
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
mCache.shift()
check mCache.window("bar").len == 0
@@ -67,7 +69,7 @@ suite "MCache":
var msg = Message(fromPeer: randomPeerId(),
seqno: "12345".toBytes(),
topicIDs: @["baz"])
mCache.put(defaultMsgIdProvider(msg), msg)
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
mCache.shift()
check mCache.window("baz").len == 0
@@ -79,19 +81,19 @@ suite "MCache":
var msg = Message(fromPeer: randomPeerId(),
seqno: "12345".toBytes(),
topicIDs: @["foo"])
mCache.put(defaultMsgIdProvider(msg), msg)
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
for i in 0..<3:
var msg = Message(fromPeer: randomPeerId(),
seqno: "12345".toBytes(),
topicIDs: @["bar"])
mCache.put(defaultMsgIdProvider(msg), msg)
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
for i in 0..<3:
var msg = Message(fromPeer: randomPeerId(),
seqno: "12345".toBytes(),
topicIDs: @["baz"])
mCache.put(defaultMsgIdProvider(msg), msg)
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
mCache.shift()
check mCache.window("foo").len == 0

View File

@@ -3,8 +3,10 @@ import unittest2
{.used.}
import options
import stew/byteutils
import ../../libp2p/[peerid, peerinfo,
crypto/crypto,
protocols/pubsub/errors,
protocols/pubsub/rpc/message,
protocols/pubsub/rpc/messages]
@@ -18,3 +20,56 @@ suite "Message":
msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true)
check verify(msg)
test "defaultMsgIdProvider success":
let
seqno = 11'u64
pkHex =
"""08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C
E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F
E731065A"""
seckey = PrivateKey.init(fromHex(stripSpaces(pkHex)))
.expect("valid private key bytes")
peer = PeerInfo.new(seckey)
msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true)
msgIdResult = msg.defaultMsgIdProvider()
check:
msgIdResult.isOk
string.fromBytes(msgIdResult.get) ==
"000000000000000b12D3KooWGyLzSt9g4U9TdHYDvVWAs5Ht4WrocgoyqPxxvnqAL8qw"
test "defaultMsgIdProvider error - no source peer id":
let
seqno = 11'u64
pkHex =
"""08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C
E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F
E731065A"""
seckey = PrivateKey.init(fromHex(stripSpaces(pkHex)))
.expect("valid private key bytes")
peer = PeerInfo.new(seckey)
var msg = Message.init(peer.some, @[], "topic", some(seqno), sign = true)
msg.fromPeer = PeerId()
let msgIdResult = msg.defaultMsgIdProvider()
check:
msgIdResult.isErr
msgIdResult.error == ValidationResult.Reject
test "defaultMsgIdProvider error - no source seqno":
let
pkHex =
"""08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C
E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F
E731065A"""
seckey = PrivateKey.init(fromHex(stripSpaces(pkHex)))
.expect("valid private key bytes")
peer = PeerInfo.new(seckey)
msg = Message.init(some(peer), @[], "topic", uint64.none, sign = true)
msgIdResult = msg.defaultMsgIdProvider()
check:
msgIdResult.isErr
msgIdResult.error == ValidationResult.Reject

View File

@@ -4,33 +4,48 @@ const
libp2p_pubsub_verify {.booldefine.} = true
libp2p_pubsub_anonymize {.booldefine.} = false
import random, tables
import chronos
import hashes, random, tables
import chronos, stew/[byteutils, results]
import ../../libp2p/[builders,
protocols/pubsub/errors,
protocols/pubsub/pubsub,
protocols/pubsub/gossipsub,
protocols/pubsub/floodsub,
protocols/pubsub/rpc/messages,
protocols/secure/secure]
export builders
randomize()
func defaultMsgIdProvider*(m: Message): Result[MessageID, ValidationResult] =
let mid =
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
byteutils.toHex(m.seqno) & $m.fromPeer
else:
# This part is irrelevant because it's not standard,
# We use it exclusively for testing basically and users should
# implement their own logic in the case they use anonymization
$m.data.hash & $m.topicIDs.hash
ok mid.toBytes()
proc generateNodes*(
num: Natural,
secureManagers: openArray[SecureProtocol] = [
SecureProtocol.Noise
],
msgIdProvider: MsgIdProvider = nil,
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
gossip: bool = false,
triggerSelf: bool = false,
verifySignature: bool = libp2p_pubsub_verify,
anonymize: bool = libp2p_pubsub_anonymize,
sign: bool = libp2p_pubsub_sign,
sendSignedPeerRecord = false,
unsubscribeBackoff = 1.seconds,
maxMessageSize: int = 1024 * 1024): seq[PubSub] =
for i in 0..<num:
let switch = newStandardSwitch(secureManagers = secureManagers)
let switch = newStandardSwitch(secureManagers = secureManagers, sendSignedPeerRecord = sendSignedPeerRecord)
let pubsub = if gossip:
let g = GossipSub.init(
switch = switch,
@@ -40,7 +55,7 @@ proc generateNodes*(
msgIdProvider = msgIdProvider,
anonymize = anonymize,
maxMessageSize = maxMessageSize,
parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p.historyLength = 20; p.historyGossip = 20; p.unsubscribeBackoff = 1.seconds; p))
parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p.historyLength = 20; p.historyGossip = 20; p.unsubscribeBackoff = unsubscribeBackoff; p))
# set some testing params, to enable scores
g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0
g.topicParams.mgetOrPut("foo", TopicParams.init()).topicWeight = 1.0

View File

@@ -463,3 +463,33 @@ suite "Connection Manager":
await connMngr.close()
await allFuturesThrowing(
allFutures(conns.mapIt( it.close() )))
asyncTest "allow force dial":
let connMngr = ConnManager.new(maxConnections = 2)
var conns: seq[Connection]
for i in 0..<3:
let conn = connMngr.trackOutgoingConn(
(proc(): Future[Connection] {.async.} =
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
), true
)
check await conn.withTimeout(10.millis)
conns.add(await conn)
# should throw adding a connection over the limit
expect TooManyConnectionsError:
discard await connMngr.trackOutgoingConn(
(proc(): Future[Connection] {.async.} =
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
), false
)
await connMngr.close()
await allFuturesThrowing(
allFutures(conns.mapIt( it.close() )))

View File

@@ -77,6 +77,7 @@ suite "Identify":
check id.protoVersion.get() == ProtoVersion
check id.agentVersion.get() == AgentVersion
check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
check id.signedPeerRecord.isNone()
asyncTest "custom agent version":
const customAgentVersion = "MY CUSTOM AGENT STRING"
@@ -100,6 +101,7 @@ suite "Identify":
check id.protoVersion.get() == ProtoVersion
check id.agentVersion.get() == customAgentVersion
check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
check id.signedPeerRecord.isNone()
asyncTest "handle failed identify":
msListen.addHandler(IdentifyCodec, identifyProto1)
@@ -123,6 +125,27 @@ suite "Identify":
discard await msDial.select(conn, IdentifyCodec)
discard await identifyProto2.identify(conn, pi2.peerId)
asyncTest "can send signed peer record":
msListen.addHandler(IdentifyCodec, identifyProto1)
identifyProto1.sendSignedPeerRecord = true
serverFut = transport1.start(ma)
proc acceptHandler(): Future[void] {.async, gcsafe.} =
let c = await transport1.accept()
await msListen.handle(c)
acceptFut = acceptHandler()
conn = await transport2.dial(transport1.addrs[0])
discard await msDial.select(conn, IdentifyCodec)
let id = await identifyProto2.identify(conn, remotePeerInfo.peerId)
check id.pubkey.get() == remoteSecKey.getPublicKey().get()
check id.addrs == ma
check id.protoVersion.get() == ProtoVersion
check id.agentVersion.get() == AgentVersion
check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
check id.signedPeerRecord.get() == remotePeerInfo.signedPeerRecord.get()
suite "handle push identify message":
var
switch1 {.threadvar.}: Switch
@@ -160,6 +183,10 @@ suite "Identify":
switch1.peerStore.addressBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.addrs.toHashSet()
switch2.peerStore.addressBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.addrs.toHashSet()
#switch1.peerStore.signedPeerRecordBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.signedPeerRecord.get()
#switch2.peerStore.signedPeerRecordBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.signedPeerRecord.get()
# no longer sent by default
proc closeAll() {.async.} =
await conn.close()

View File

@@ -1,10 +1,12 @@
{.used.}
import options, bearssl
import chronos
import chronos, stew/byteutils
import ../libp2p/crypto/crypto,
../libp2p/multicodec,
../libp2p/peerinfo,
../libp2p/peerid
../libp2p/peerid,
../libp2p/routing_record
import ./helpers
@@ -16,3 +18,32 @@ suite "PeerInfo":
check peerId == peerInfo.peerId
check seckey.getPublicKey().get() == peerInfo.publicKey
test "Signed peer record":
const
ExpectedDomain = $multiCodec("libp2p-peer-record")
ExpectedPayloadType = @[(byte) 0x03, (byte) 0x01]
let
seckey = PrivateKey.random(rng[]).tryGet()
peerId = PeerID.init(seckey).get()
multiAddresses = @[MultiAddress.init("/ip4/0.0.0.0/tcp/24").tryGet(), MultiAddress.init("/ip4/0.0.0.0/tcp/25").tryGet()]
peerInfo = PeerInfo.new(seckey, multiAddresses)
let
env = peerInfo.signedPeerRecord.get()
rec = PeerRecord.decode(env.payload()).tryGet()
# Check envelope fields
check:
env.publicKey == peerInfo.publicKey
env.domain == ExpectedDomain
env.payloadType == ExpectedPayloadType
# Check payload (routing record)
check:
rec.peerId == peerId
rec.seqNo > 0
rec.addresses.len == 2
rec.addresses[0].address == multiAddresses[0]
rec.addresses[1].address == multiAddresses[1]

View File

@@ -9,7 +9,7 @@ suite "Routing record":
privKey = PrivateKey.random(rng[]).tryGet()
peerId = PeerId.init(privKey).tryGet()
multiAddresses = @[MultiAddress.init("/ip4/0.0.0.0/tcp/24").tryGet(), MultiAddress.init("/ip4/0.0.0.0/tcp/25").tryGet()]
routingRecord = PeerRecord.init(peerId, 42, multiAddresses)
routingRecord = PeerRecord.init(peerId, multiAddresses, 42)
buffer = routingRecord.encode()
@@ -36,3 +36,33 @@ suite "Routing record":
$decodedRecord.addresses[0].address == "/ip4/1.2.3.4/tcp/0"
$decodedRecord.addresses[1].address == "/ip4/1.2.3.4/tcp/1"
suite "Signed Routing Record":
test "Encode -> decode test":
let
rng = newRng()
privKey = PrivateKey.random(rng[]).tryGet()
peerId = PeerId.init(privKey).tryGet()
multiAddresses = @[MultiAddress.init("/ip4/0.0.0.0/tcp/24").tryGet(), MultiAddress.init("/ip4/0.0.0.0/tcp/25").tryGet()]
routingRecord = SignedPeerRecord.init(privKey, PeerRecord.init(peerId, multiAddresses, 42)).tryGet()
buffer = routingRecord.envelope.encode().tryGet()
parsedRR = SignedPeerRecord.decode(buffer).tryGet().data
check:
parsedRR.peerId == peerId
parsedRR.seqNo == 42
parsedRR.addresses.len == 2
parsedRR.addresses[0].address == multiAddresses[0]
parsedRR.addresses[1].address == multiAddresses[1]
test "Can't use mismatched public key":
let
rng = newRng()
privKey = PrivateKey.random(rng[]).tryGet()
privKey2 = PrivateKey.random(rng[]).tryGet()
peerId = PeerId.init(privKey).tryGet()
multiAddresses = @[MultiAddress.init("/ip4/0.0.0.0/tcp/24").tryGet(), MultiAddress.init("/ip4/0.0.0.0/tcp/25").tryGet()]
routingRecord = SignedPeerRecord.init(privKey2, PeerRecord.init(peerId, multiAddresses, 42)).tryGet()
buffer = routingRecord.envelope.encode().tryGet()
check SignedPeerRecord.decode(buffer).error == EnvelopeInvalidSignature

View File

@@ -36,7 +36,7 @@ suite "AsyncSemaphore":
await sema.acquire()
let fut = sema.acquire()
check sema.count == -1
check sema.count == 0
sema.release()
sema.release()
check sema.count == 1
@@ -66,7 +66,7 @@ suite "AsyncSemaphore":
let fut = sema.acquire()
check fut.finished == false
check sema.count == -1
check sema.count == 0
sema.release()
sema.release()
@@ -104,12 +104,20 @@ suite "AsyncSemaphore":
await sema.acquire()
let tmp = sema.acquire()
check not tmp.finished()
let
tmp = sema.acquire()
tmp2 = sema.acquire()
check:
not tmp.finished()
not tmp2.finished()
tmp.cancel()
sema.release()
check tmp2.finished()
sema.release()
check await sema.acquire().withTimeout(10.millis)
asyncTest "should handle out of order cancellations":
@@ -145,3 +153,43 @@ suite "AsyncSemaphore":
sema.release()
check await sema.acquire().withTimeout(10.millis)
asyncTest "should handle forceAcquire properly":
let sema = newAsyncSemaphore(1)
await sema.acquire()
check not(await sema.acquire().withTimeout(1.millis)) # should not acquire but cancel
let
fut1 = sema.acquire()
fut2 = sema.acquire()
sema.forceAcquire()
sema.release()
await fut1 or fut2 or sleepAsync(1.millis)
check:
fut1.finished()
not fut2.finished()
sema.release()
await fut1 or fut2 or sleepAsync(1.millis)
check:
fut1.finished()
fut2.finished()
sema.forceAcquire()
sema.forceAcquire()
let
fut3 = sema.acquire()
fut4 = sema.acquire()
fut5 = sema.acquire()
sema.release()
sema.release()
await sleepAsync(1.millis)
check:
fut3.finished()
fut4.finished()
not fut5.finished()

View File

@@ -3,7 +3,7 @@ import stew/byteutils
import ../libp2p/[signed_envelope]
suite "Signed envelope":
test "Encode -> decode test":
test "Encode -> decode -> encode -> decode test":
let
rng = newRng()
privKey = PrivateKey.random(rng[]).tryGet()
@@ -12,10 +12,16 @@ suite "Signed envelope":
decodedEnvelope = Envelope.decode(buffer, "domain").tryGet()
wrongDomain = Envelope.decode(buffer, "wdomain")
reencodedEnvelope = decodedEnvelope.encode().tryGet()
redecodedEnvelope = Envelope.decode(reencodedEnvelope, "domain").tryGet()
check:
decodedEnvelope == envelope
wrongDomain.error == EnvelopeInvalidSignature
reencodedEnvelope == buffer
redecodedEnvelope == envelope
test "Interop decode test":
# from https://github.com/libp2p/go-libp2p-core/blob/b18a4c9c5629870bde2cd85ab3b87a507600d411/record/envelope_test.go#L68
let inputData = "0a24080112206f1581709bb7b1ef030d210db18e3b0ba1c776fba65d8cdaad05415142d189f812102f6c69627032702f74657374646174611a0c68656c6c6f20776f726c64212a401178673b51dfa842aad17e465e25d646ad16628916b964c3fb10c711fee87872bdd4e4646f58c277cdff09704913d8be1aec6322de8d3d0bb852120374aece08".hexToSeqByte()
@@ -28,3 +34,56 @@ suite "Signed envelope":
# same as above, but payload altered
let inputData = "0a24080112206f1581709bb7b1ef030d210db18e3b0ba1c776fba65d8cdaad05415142d189f812102f6c69627032702f74657374646174611a0c00006c6c6f20776f726c64212a401178673b51dfa842aad17e465e25d646ad16628916b964c3fb10c711fee87872bdd4e4646f58c277cdff09704913d8be1aec6322de8d3d0bb852120374aece08".hexToSeqByte()
check Envelope.decode(inputData, "libp2p-testing").error == EnvelopeInvalidSignature
# needs to be exported to work
type
DummyPayload* = object
awesome: byte
SignedDummy = SignedPayload[DummyPayload]
proc decode*(T: typedesc[DummyPayload], buffer: seq[byte]): Result[DummyPayload, cstring] =
ok(DummyPayload(awesome: buffer[0]))
proc encode*(pd: DummyPayload): seq[byte] =
@[pd.awesome]
proc checkValid*(pd: SignedDummy): Result[void, EnvelopeError] =
if pd.data.awesome == 12.byte: ok()
else: err(EnvelopeInvalidSignature)
proc payloadDomain*(T: typedesc[DummyPayload]): string = "dummy"
proc payloadType*(T: typedesc[DummyPayload]): seq[byte] = @[(byte) 0x00, (byte) 0x00]
suite "Signed payload":
test "Simple encode -> decode":
let
rng = newRng()
privKey = PrivateKey.random(rng[]).tryGet()
dummyPayload = DummyPayload(awesome: 12.byte)
signed = SignedDummy.init(privKey, dummyPayload).tryGet()
encoded = signed.encode().tryGet()
decoded = SignedDummy.decode(encoded).tryGet()
check:
dummyPayload.awesome == decoded.data.awesome
decoded.envelope.publicKey == privKey.getPublicKey().tryGet()
test "Invalid payload":
let
rng = newRng()
privKey = PrivateKey.random(rng[]).tryGet()
dummyPayload = DummyPayload(awesome: 30.byte)
signed = SignedDummy.init(privKey, dummyPayload).tryGet()
encoded = signed.encode().tryGet()
check SignedDummy.decode(encoded).error == EnvelopeInvalidSignature
test "Invalid payload type":
let
rng = newRng()
privKey = PrivateKey.random(rng[]).tryGet()
dummyPayload = DummyPayload(awesome: 30.byte)
signed = Envelope.init(privKey, @[55.byte], dummyPayload.encode(), DummyPayload.payloadDomain).tryGet()
encoded = signed.encode().tryGet()
check SignedDummy.decode(encoded).error == EnvelopeWrongType

View File

@@ -841,6 +841,10 @@ suite "Switch":
switch2.peerStore.protoBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.protocols.toHashSet()
asyncTest "e2e should allow multiple local addresses":
when defined(windows):
# this randomly locks the Windows CI job
skip()
return
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
try:
let msg = string.fromBytes(await conn.readLp(1024))