mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-11 06:57:58 -05:00
Compare commits
17 Commits
removeasyn
...
ringbuffer
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
01c2c0c54b | ||
|
|
eaa72dcdbe | ||
|
|
c7504d2446 | ||
|
|
cba3ca3c3e | ||
|
|
44a7260f07 | ||
|
|
c09d032133 | ||
|
|
f98bf612bd | ||
|
|
fd59cbc7a9 | ||
|
|
bc318084f4 | ||
|
|
3b718baa97 | ||
|
|
9a7e3bda3c | ||
|
|
00e1f9342f | ||
|
|
07da14a7a7 | ||
|
|
c18830ad33 | ||
|
|
1a97d0a2f5 | ||
|
|
e72d03bc78 | ||
|
|
388b92d58f |
150
.github/workflows/ci.yml
vendored
150
.github/workflows/ci.yml
vendored
@@ -9,78 +9,49 @@ on:
|
||||
|
||||
jobs:
|
||||
build:
|
||||
timeout-minutes: 90
|
||||
strategy:
|
||||
fail-fast: false
|
||||
max-parallel: 20
|
||||
matrix:
|
||||
target:
|
||||
# Unit tests
|
||||
- os: linux
|
||||
cpu: amd64
|
||||
- os: linux
|
||||
cpu: i386
|
||||
- os: macos
|
||||
cpu: amd64
|
||||
- os: windows
|
||||
cpu: i386
|
||||
- os: windows
|
||||
cpu: amd64
|
||||
#- os: windows
|
||||
#cpu: i386
|
||||
branch: [version-1-2, devel]
|
||||
include:
|
||||
- target:
|
||||
os: linux
|
||||
builder: ubuntu-20.04
|
||||
shell: bash
|
||||
- target:
|
||||
os: macos
|
||||
builder: macos-10.15
|
||||
shell: bash
|
||||
- target:
|
||||
os: windows
|
||||
builder: windows-2019
|
||||
shell: msys2 {0}
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
shell: ${{ matrix.shell }}
|
||||
|
||||
name: '${{ matrix.target.os }}-${{ matrix.target.cpu }}'
|
||||
name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})'
|
||||
runs-on: ${{ matrix.builder }}
|
||||
continue-on-error: ${{ matrix.branch == 'version-1-6' || matrix.branch == 'devel' }}
|
||||
steps:
|
||||
- name: Checkout nim-libp2p
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
- name: Derive environment variables
|
||||
run: |
|
||||
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
|
||||
ARCH=64
|
||||
PLATFORM=x64
|
||||
else
|
||||
ARCH=32
|
||||
PLATFORM=x86
|
||||
fi
|
||||
echo "ARCH=$ARCH" >> $GITHUB_ENV
|
||||
echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV
|
||||
|
||||
ncpu=
|
||||
ext=
|
||||
MAKE_CMD="make"
|
||||
case '${{ runner.os }}' in
|
||||
'Linux')
|
||||
ncpu=$(nproc)
|
||||
;;
|
||||
'macOS')
|
||||
ncpu=$(sysctl -n hw.ncpu)
|
||||
;;
|
||||
'Windows')
|
||||
ncpu=$NUMBER_OF_PROCESSORS
|
||||
ext=.exe
|
||||
MAKE_CMD="mingw32-make"
|
||||
;;
|
||||
esac
|
||||
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
|
||||
echo "ncpu=$ncpu" >> $GITHUB_ENV
|
||||
echo "ext=$ext" >> $GITHUB_ENV
|
||||
echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV
|
||||
|
||||
- name: Install build dependencies (Linux i386)
|
||||
if: runner.os == 'Linux' && matrix.target.cpu == 'i386'
|
||||
run: |
|
||||
@@ -101,68 +72,83 @@ jobs:
|
||||
chmod 755 external/bin/gcc external/bin/g++
|
||||
echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH
|
||||
|
||||
- name: Restore MinGW-W64 (Windows) from cache
|
||||
if: runner.os == 'Windows'
|
||||
id: windows-mingw-cache
|
||||
uses: actions/cache@v2
|
||||
- name: MSYS2 (Windows i386)
|
||||
if: runner.os == 'Windows' && matrix.target.cpu == 'i386'
|
||||
uses: msys2/setup-msys2@v2
|
||||
with:
|
||||
path: external/mingw-${{ matrix.target.cpu }}
|
||||
key: 'mingw-${{ matrix.target.cpu }}'
|
||||
path-type: inherit
|
||||
msystem: MINGW32
|
||||
install: >-
|
||||
base-devel
|
||||
git
|
||||
mingw-w64-i686-toolchain
|
||||
|
||||
- name: MSYS2 (Windows amd64)
|
||||
if: runner.os == 'Windows' && matrix.target.cpu == 'amd64'
|
||||
uses: msys2/setup-msys2@v2
|
||||
with:
|
||||
path-type: inherit
|
||||
install: >-
|
||||
base-devel
|
||||
git
|
||||
mingw-w64-x86_64-toolchain
|
||||
|
||||
- name: Restore Nim DLLs dependencies (Windows) from cache
|
||||
if: runner.os == 'Windows'
|
||||
id: windows-dlls-cache
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: external/dlls-${{ matrix.target.cpu }}
|
||||
key: 'dlls-${{ matrix.target.cpu }}'
|
||||
path: external/dlls
|
||||
key: 'dlls'
|
||||
|
||||
- name: Install MinGW64 dependency (Windows)
|
||||
if: >
|
||||
steps.windows-mingw-cache.outputs.cache-hit != 'true' &&
|
||||
runner.os == 'Windows'
|
||||
run: |
|
||||
mkdir -p external
|
||||
curl -L "https://nim-lang.org/download/mingw$ARCH.7z" -o "external/mingw-${{ matrix.target.cpu }}.7z"
|
||||
7z x -y "external/mingw-${{ matrix.target.cpu }}.7z" -oexternal/
|
||||
mv external/mingw$ARCH external/mingw-${{ matrix.target.cpu }}
|
||||
|
||||
- name: Install DLLs dependencies (Windows)
|
||||
- name: Install DLL dependencies (Windows)
|
||||
if: >
|
||||
steps.windows-dlls-cache.outputs.cache-hit != 'true' &&
|
||||
runner.os == 'Windows'
|
||||
run: |
|
||||
mkdir -p external
|
||||
mkdir external
|
||||
curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip
|
||||
7z x -y external/windeps.zip -oexternal/dlls-${{ matrix.target.cpu }}
|
||||
7z x external/windeps.zip -oexternal/dlls
|
||||
|
||||
- name: Path to cached dependencies (Windows)
|
||||
if: >
|
||||
runner.os == 'Windows'
|
||||
run: |
|
||||
echo "${{ github.workspace }}/external/mingw-${{ matrix.target.cpu }}/bin" >> $GITHUB_PATH
|
||||
echo "${{ github.workspace }}/external/dlls-${{ matrix.target.cpu }}" >> $GITHUB_PATH
|
||||
echo '${{ github.workspace }}'"/external/dlls" >> $GITHUB_PATH
|
||||
|
||||
- name: Get latest Nim commit hash
|
||||
id: versions
|
||||
- name: Derive environment variables
|
||||
run: |
|
||||
getHash() {
|
||||
git ls-remote "https://github.com/$1" "${2:-HEAD}" | cut -f 1
|
||||
}
|
||||
nbsHash=$(getHash status-im/nimbus-build-system)
|
||||
echo "::set-output name=nimbus_build_system::$nbsHash"
|
||||
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
|
||||
PLATFORM=x64
|
||||
else
|
||||
PLATFORM=x86
|
||||
fi
|
||||
echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV
|
||||
|
||||
- name: Restore prebuilt Nim from cache
|
||||
id: nim-cache
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: NimBinaries
|
||||
key: 'NimBinaries-${{ matrix.target.os }}-${{ matrix.target.cpu }}-${{ steps.versions.outputs.nimbus_build_system }}'
|
||||
ncpu=
|
||||
MAKE_CMD="make"
|
||||
case '${{ runner.os }}' in
|
||||
'Linux')
|
||||
ncpu=$(nproc)
|
||||
;;
|
||||
'macOS')
|
||||
ncpu=$(sysctl -n hw.ncpu)
|
||||
;;
|
||||
'Windows')
|
||||
ncpu=$NUMBER_OF_PROCESSORS
|
||||
MAKE_CMD="mingw32-make"
|
||||
;;
|
||||
esac
|
||||
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
|
||||
echo "ncpu=$ncpu" >> $GITHUB_ENV
|
||||
echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV
|
||||
|
||||
- name: Build Nim and associated tools
|
||||
- name: Build Nim and Nimble
|
||||
run: |
|
||||
curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh
|
||||
env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} CC=gcc bash build_nim.sh nim csources dist/nimble NimBinaries
|
||||
env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} NIM_COMMIT=${{ matrix.branch }} \
|
||||
QUICK_AND_DIRTY_COMPILER=1 QUICK_AND_DIRTY_NIMBLE=1 CC=gcc \
|
||||
bash build_nim.sh nim csources dist/nimble NimBinaries
|
||||
echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH
|
||||
|
||||
- name: Setup Go
|
||||
@@ -174,8 +160,14 @@ jobs:
|
||||
run: |
|
||||
V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3
|
||||
|
||||
- name: Run nim-libp2p tests
|
||||
- name: Run tests
|
||||
run: |
|
||||
if [[ "${{ matrix.target.os }}" == "windows" ]]; then
|
||||
# https://github.com/status-im/nimbus-eth2/issues/3121
|
||||
export NIMFLAGS="-d:nimRawSetjmp"
|
||||
fi
|
||||
nim --version
|
||||
nimble --version
|
||||
nimble install_pinned
|
||||
nimble test
|
||||
|
||||
|
||||
145
.github/workflows/multi_nim.yml
vendored
145
.github/workflows/multi_nim.yml
vendored
@@ -6,6 +6,7 @@ on:
|
||||
|
||||
jobs:
|
||||
build:
|
||||
timeout-minutes: 120
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
@@ -16,69 +17,39 @@ jobs:
|
||||
cpu: i386
|
||||
- os: macos
|
||||
cpu: amd64
|
||||
#- os: windows
|
||||
#cpu: i386
|
||||
- os: windows
|
||||
cpu: amd64
|
||||
#- os: windows
|
||||
#cpu: i386
|
||||
branch: [version-1-2, version-1-4, version-1-6, devel]
|
||||
include:
|
||||
- target:
|
||||
os: linux
|
||||
builder: ubuntu-20.04
|
||||
shell: bash
|
||||
- target:
|
||||
os: macos
|
||||
builder: macos-10.15
|
||||
shell: bash
|
||||
- target:
|
||||
os: windows
|
||||
builder: windows-2019
|
||||
shell: msys2 {0}
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
shell: ${{ matrix.shell }}
|
||||
|
||||
name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})'
|
||||
runs-on: ${{ matrix.builder }}
|
||||
continue-on-error: ${{ matrix.branch == 'version-1-6' || matrix.branch == 'devel' }}
|
||||
steps:
|
||||
- name: Checkout nim-libp2p
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
ref: master
|
||||
ref: unstable
|
||||
submodules: true
|
||||
|
||||
- name: Derive environment variables
|
||||
run: |
|
||||
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
|
||||
ARCH=64
|
||||
PLATFORM=x64
|
||||
else
|
||||
ARCH=32
|
||||
PLATFORM=x86
|
||||
fi
|
||||
echo "ARCH=$ARCH" >> $GITHUB_ENV
|
||||
echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV
|
||||
|
||||
ncpu=
|
||||
ext=
|
||||
MAKE_CMD="make"
|
||||
case '${{ runner.os }}' in
|
||||
'Linux')
|
||||
ncpu=$(nproc)
|
||||
;;
|
||||
'macOS')
|
||||
ncpu=$(sysctl -n hw.ncpu)
|
||||
;;
|
||||
'Windows')
|
||||
ncpu=$NUMBER_OF_PROCESSORS
|
||||
ext=.exe
|
||||
MAKE_CMD="mingw32-make"
|
||||
;;
|
||||
esac
|
||||
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
|
||||
echo "ncpu=$ncpu" >> $GITHUB_ENV
|
||||
echo "ext=$ext" >> $GITHUB_ENV
|
||||
echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV
|
||||
|
||||
- name: Install build dependencies (Linux i386)
|
||||
if: runner.os == 'Linux' && matrix.target.cpu == 'i386'
|
||||
run: |
|
||||
@@ -99,47 +70,76 @@ jobs:
|
||||
chmod 755 external/bin/gcc external/bin/g++
|
||||
echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH
|
||||
|
||||
- name: Restore MinGW-W64 (Windows) from cache
|
||||
if: runner.os == 'Windows'
|
||||
id: windows-mingw-cache
|
||||
uses: actions/cache@v2
|
||||
- name: MSYS2 (Windows i386)
|
||||
if: runner.os == 'Windows' && matrix.target.cpu == 'i386'
|
||||
uses: msys2/setup-msys2@v2
|
||||
with:
|
||||
path: external/mingw-${{ matrix.target.cpu }}
|
||||
key: 'mingw-${{ matrix.target.cpu }}'
|
||||
path-type: inherit
|
||||
msystem: MINGW32
|
||||
install: >-
|
||||
base-devel
|
||||
git
|
||||
mingw-w64-i686-toolchain
|
||||
|
||||
- name: MSYS2 (Windows amd64)
|
||||
if: runner.os == 'Windows' && matrix.target.cpu == 'amd64'
|
||||
uses: msys2/setup-msys2@v2
|
||||
with:
|
||||
path-type: inherit
|
||||
install: >-
|
||||
base-devel
|
||||
git
|
||||
mingw-w64-x86_64-toolchain
|
||||
|
||||
- name: Restore Nim DLLs dependencies (Windows) from cache
|
||||
if: runner.os == 'Windows'
|
||||
id: windows-dlls-cache
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: external/dlls-${{ matrix.target.cpu }}
|
||||
key: 'dlls-${{ matrix.target.cpu }}'
|
||||
path: external/dlls
|
||||
key: 'dlls'
|
||||
|
||||
- name: Install MinGW64 dependency (Windows)
|
||||
if: >
|
||||
steps.windows-mingw-cache.outputs.cache-hit != 'true' &&
|
||||
runner.os == 'Windows'
|
||||
run: |
|
||||
mkdir -p external
|
||||
curl -L "https://nim-lang.org/download/mingw$ARCH.7z" -o "external/mingw-${{ matrix.target.cpu }}.7z"
|
||||
7z x -y "external/mingw-${{ matrix.target.cpu }}.7z" -oexternal/
|
||||
mv external/mingw$ARCH external/mingw-${{ matrix.target.cpu }}
|
||||
|
||||
- name: Install DLLs dependencies (Windows)
|
||||
- name: Install DLL dependencies (Windows)
|
||||
if: >
|
||||
steps.windows-dlls-cache.outputs.cache-hit != 'true' &&
|
||||
runner.os == 'Windows'
|
||||
run: |
|
||||
mkdir -p external
|
||||
mkdir external
|
||||
curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip
|
||||
7z x -y external/windeps.zip -oexternal/dlls-${{ matrix.target.cpu }}
|
||||
7z x external/windeps.zip -oexternal/dlls
|
||||
|
||||
- name: Path to cached dependencies (Windows)
|
||||
if: >
|
||||
runner.os == 'Windows'
|
||||
run: |
|
||||
echo "${{ github.workspace }}/external/mingw-${{ matrix.target.cpu }}/bin" >> $GITHUB_PATH
|
||||
echo "${{ github.workspace }}/external/dlls-${{ matrix.target.cpu }}" >> $GITHUB_PATH
|
||||
echo '${{ github.workspace }}'"/external/dlls" >> $GITHUB_PATH
|
||||
|
||||
- name: Derive environment variables
|
||||
run: |
|
||||
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
|
||||
PLATFORM=x64
|
||||
else
|
||||
PLATFORM=x86
|
||||
fi
|
||||
echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV
|
||||
|
||||
ncpu=
|
||||
MAKE_CMD="make"
|
||||
case '${{ runner.os }}' in
|
||||
'Linux')
|
||||
ncpu=$(nproc)
|
||||
;;
|
||||
'macOS')
|
||||
ncpu=$(sysctl -n hw.ncpu)
|
||||
;;
|
||||
'Windows')
|
||||
ncpu=$NUMBER_OF_PROCESSORS
|
||||
MAKE_CMD="mingw32-make"
|
||||
;;
|
||||
esac
|
||||
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
|
||||
echo "ncpu=$ncpu" >> $GITHUB_ENV
|
||||
echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV
|
||||
|
||||
- name: Build Nim and Nimble
|
||||
run: |
|
||||
@@ -149,12 +149,27 @@ jobs:
|
||||
bash build_nim.sh nim csources dist/nimble NimBinaries
|
||||
echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH
|
||||
|
||||
- name: Run nim-libp2p tests
|
||||
- name: Setup Go
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '^1.15.5'
|
||||
|
||||
- name: Install p2pd
|
||||
run: |
|
||||
V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3
|
||||
|
||||
- name: Run tests
|
||||
run: |
|
||||
if [[ "${{ matrix.target.os }}" == "windows" ]]; then
|
||||
# https://github.com/status-im/nimbus-eth2/issues/3121
|
||||
export NIMFLAGS="-d:nimRawSetjmp"
|
||||
fi
|
||||
nim --version
|
||||
nimble --version
|
||||
nimble install -y --depsOnly
|
||||
nimble test_slim
|
||||
nimble test
|
||||
if [[ "${{ matrix.branch }}" == "version-1-6" || "${{ matrix.branch }}" == "devel" ]]; then
|
||||
echo -e "\nTesting with '--gc:orc':\n"
|
||||
export NIMFLAGS="--gc:orc"
|
||||
nimble test_slim
|
||||
export NIMFLAGS="${NIMFLAGS} --gc:orc"
|
||||
nimble test
|
||||
fi
|
||||
|
||||
20
.pinned
20
.pinned
@@ -1,17 +1,17 @@
|
||||
asynctest;https://github.com/markspanbroek/asynctest@#3882ed64ed3159578f796bc5ae0c6b13837fe798
|
||||
bearssl;https://github.com/status-im/nim-bearssl@#ba80e2a0d7ae8aab666cee013e38ff8d33a3e5e7
|
||||
chronicles;https://github.com/status-im/nim-chronicles@#2a2681b60289aaf7895b7056f22616081eb1a882
|
||||
chronos;https://github.com/status-im/nim-chronos@#7dc58d42b6905a7fd7531875fa76060f8f744e4e
|
||||
chronos;https://github.com/status-im/nim-chronos@#87197230779002a2bfa8642f0e2ae07e2349e304
|
||||
dnsclient;https://github.com/ba0f3/dnsclient.nim@#fbb76f8af8a33ab818184a7d4406d9fee20993be
|
||||
faststreams;https://github.com/status-im/nim-faststreams@#c653d05f277dca0f374732c5b9b80f2368faea33
|
||||
httputils;https://github.com/status-im/nim-http-utils@#507bfb7dcb6244d76ce2567df7bf3756cbe88775
|
||||
json_serialization;https://github.com/status-im/nim-json-serialization@#010aa238cf6afddf1fbe4cbcd27ab3be3f443841
|
||||
metrics;https://github.com/status-im/nim-metrics@#2c0c486c65f980e8387f86bed0b43d53161c8286
|
||||
faststreams;https://github.com/status-im/nim-faststreams@#37a183153c071539ab870f427c09a1376ba311b9
|
||||
httputils;https://github.com/status-im/nim-http-utils@#40048e8b3e69284bdb5d4daa0a16ad93402c55db
|
||||
json_serialization;https://github.com/status-im/nim-json-serialization@#4b8f487d2dfdd941df7408ceaa70b174cce02180
|
||||
metrics;https://github.com/status-im/nim-metrics@#71e0f0e354e1f4c59e3dc92153989c8b723c3440
|
||||
nimcrypto;https://github.com/cheatfate/nimcrypto@#a5742a9a214ac33f91615f3862c7b099aec43b00
|
||||
secp256k1;https://github.com/status-im/nim-secp256k1@#d790c42206fab4b8008eaa91181ca8c8c68a0105
|
||||
serialization;https://github.com/status-im/nim-serialization@#11a8aa64d27d4fa92e266b9488500461da193c24
|
||||
stew;https://github.com/status-im/nim-stew@#2f9c61f485e1de6d7e163294008276c455d39da2
|
||||
secp256k1;https://github.com/status-im/nim-secp256k1@#e092373a5cbe1fa25abfc62e0f2a5f138dc3fb13
|
||||
serialization;https://github.com/status-im/nim-serialization@#37bc0db558d85711967acb16e9bb822b06911d46
|
||||
stew;https://github.com/status-im/nim-stew@#bb705bf17b46d2c8f9bfb106d9cc7437009a2501
|
||||
testutils;https://github.com/status-im/nim-testutils@#aa6e5216f4b4ab5aa971cdcdd70e1ec1203cedf2
|
||||
unittest2;https://github.com/status-im/nim-unittest2@#4e2893eacb916c7678fdc4935ff7420f13bf3a9c
|
||||
websock;https://github.com/status-im/nim-websock@#c2aae352f7fad7a8d333327c37e966969d3ee542
|
||||
zlib;https://github.com/status-im/nim-zlib@#d4e716d071eba1b5e0ffdf7949d983959e2b95dd
|
||||
websock;https://github.com/status-im/nim-websock@#853299e399746eff4096870067cbc61861ecd534
|
||||
zlib;https://github.com/status-im/nim-zlib@#74cdeb54b21bededb5a515d36f608bc1850555a2
|
||||
|
||||
@@ -27,9 +27,8 @@ const nimflags =
|
||||
|
||||
proc runTest(filename: string, verify: bool = true, sign: bool = true,
|
||||
moreoptions: string = "") =
|
||||
let env_nimflags = getEnv("NIMFLAGS")
|
||||
var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics -d:libp2p_network_protocols_metrics "
|
||||
excstr.add(" " & env_nimflags & " ")
|
||||
excstr.add(" " & getEnv("NIMFLAGS") & " ")
|
||||
excstr.add(" " & nimflags & " ")
|
||||
excstr.add(" -d:libp2p_pubsub_sign=" & $sign)
|
||||
excstr.add(" -d:libp2p_pubsub_verify=" & $verify)
|
||||
@@ -91,7 +90,7 @@ task test, "Runs the test suite":
|
||||
exec "nimble testfilter"
|
||||
exec "nimble examples_build"
|
||||
|
||||
task test_slim, "Runs the test suite":
|
||||
task test_slim, "Runs the (slimmed down) test suite":
|
||||
exec "nimble testnative"
|
||||
exec "nimble testpubsub_slim"
|
||||
exec "nimble testfilter"
|
||||
|
||||
@@ -42,6 +42,7 @@ type
|
||||
rng: ref BrHmacDrbgContext
|
||||
maxConnections: int
|
||||
maxIn: int
|
||||
sendSignedPeerRecord: bool
|
||||
maxOut: int
|
||||
maxConnsPerPeer: int
|
||||
protoVersion: string
|
||||
@@ -77,6 +78,9 @@ proc withAddresses*(b: SwitchBuilder, addresses: seq[MultiAddress]): SwitchBuild
|
||||
b.addresses = addresses
|
||||
b
|
||||
|
||||
proc withSignedPeerRecord*(b: SwitchBuilder, sendIt = true): SwitchBuilder =
|
||||
b.sendSignedPeerRecord = sendIt
|
||||
b
|
||||
|
||||
proc withMplex*(b: SwitchBuilder, inTimeout = 5.minutes, outTimeout = 5.minutes): SwitchBuilder =
|
||||
proc newMuxer(conn: Connection): Muxer =
|
||||
@@ -165,7 +169,7 @@ proc build*(b: SwitchBuilder): Switch
|
||||
muxers
|
||||
|
||||
let
|
||||
identify = Identify.new(peerInfo)
|
||||
identify = Identify.new(peerInfo, b.sendSignedPeerRecord)
|
||||
connManager = ConnManager.new(b.maxConnsPerPeer, b.maxConnections, b.maxIn, b.maxOut)
|
||||
ms = MultistreamSelect.new()
|
||||
muxedUpgrade = MuxedUpgrade.new(identify, muxers, secureManagerInstances, connManager, ms)
|
||||
@@ -209,7 +213,8 @@ proc newStandardSwitch*(
|
||||
maxIn = -1,
|
||||
maxOut = -1,
|
||||
maxConnsPerPeer = MaxConnectionsPerPeer,
|
||||
nameResolver: NameResolver = nil): Switch
|
||||
nameResolver: NameResolver = nil,
|
||||
sendSignedPeerRecord = false): Switch
|
||||
{.raises: [Defect, LPError].} =
|
||||
if SecureProtocol.Secio in secureManagers:
|
||||
quit("Secio is deprecated!") # use of secio is unsafe
|
||||
@@ -219,6 +224,7 @@ proc newStandardSwitch*(
|
||||
.new()
|
||||
.withAddresses(addrs)
|
||||
.withRng(rng)
|
||||
.withSignedPeerRecord(sendSignedPeerRecord)
|
||||
.withMaxConnections(maxConnections)
|
||||
.withMaxIn(maxIn)
|
||||
.withMaxOut(maxOut)
|
||||
|
||||
@@ -452,7 +452,8 @@ proc trackIncomingConn*(c: ConnManager,
|
||||
raise exc
|
||||
|
||||
proc trackOutgoingConn*(c: ConnManager,
|
||||
provider: ConnProvider):
|
||||
provider: ConnProvider,
|
||||
forceDial = false):
|
||||
Future[Connection] {.async.} =
|
||||
## try acquiring a connection if all slots
|
||||
## are already taken, raise TooManyConnectionsError
|
||||
@@ -462,7 +463,9 @@ proc trackOutgoingConn*(c: ConnManager,
|
||||
trace "Tracking outgoing connection", count = c.outSema.count,
|
||||
max = c.outSema.size
|
||||
|
||||
if not c.outSema.tryAcquire():
|
||||
if forceDial:
|
||||
c.outSema.forceAcquire()
|
||||
elif not c.outSema.tryAcquire():
|
||||
trace "Too many outgoing connections!", count = c.outSema.count,
|
||||
max = c.outSema.size
|
||||
raise newTooManyConnectionsError()
|
||||
|
||||
@@ -19,7 +19,8 @@ type
|
||||
method connect*(
|
||||
self: Dial,
|
||||
peerId: PeerId,
|
||||
addrs: seq[MultiAddress]) {.async, base.} =
|
||||
addrs: seq[MultiAddress],
|
||||
forceDial = false) {.async, base.} =
|
||||
## connect remote peer without negotiating
|
||||
## a protocol
|
||||
##
|
||||
@@ -29,7 +30,8 @@ method connect*(
|
||||
method dial*(
|
||||
self: Dial,
|
||||
peerId: PeerId,
|
||||
protos: seq[string]): Future[Connection] {.async, base.} =
|
||||
protos: seq[string],
|
||||
): Future[Connection] {.async, base.} =
|
||||
## create a protocol stream over an
|
||||
## existing connection
|
||||
##
|
||||
@@ -40,7 +42,8 @@ method dial*(
|
||||
self: Dial,
|
||||
peerId: PeerId,
|
||||
addrs: seq[MultiAddress],
|
||||
protos: seq[string]): Future[Connection] {.async, base.} =
|
||||
protos: seq[string],
|
||||
forceDial = false): Future[Connection] {.async, base.} =
|
||||
## create a protocol stream and establish
|
||||
## a connection if one doesn't exist already
|
||||
##
|
||||
|
||||
@@ -47,7 +47,8 @@ type
|
||||
proc dialAndUpgrade(
|
||||
self: Dialer,
|
||||
peerId: PeerId,
|
||||
addrs: seq[MultiAddress]):
|
||||
addrs: seq[MultiAddress],
|
||||
forceDial: bool):
|
||||
Future[Connection] {.async.} =
|
||||
debug "Dialing peer", peerId
|
||||
|
||||
@@ -72,7 +73,8 @@ proc dialAndUpgrade(
|
||||
transportCopy = transport
|
||||
addressCopy = a
|
||||
await self.connManager.trackOutgoingConn(
|
||||
() => transportCopy.dial(hostname, addressCopy)
|
||||
() => transportCopy.dial(hostname, addressCopy),
|
||||
forceDial
|
||||
)
|
||||
except TooManyConnectionsError as exc:
|
||||
trace "Connection limit reached!"
|
||||
@@ -112,7 +114,8 @@ proc dialAndUpgrade(
|
||||
proc internalConnect(
|
||||
self: Dialer,
|
||||
peerId: PeerId,
|
||||
addrs: seq[MultiAddress]):
|
||||
addrs: seq[MultiAddress],
|
||||
forceDial: bool):
|
||||
Future[Connection] {.async.} =
|
||||
if self.localPeerId == peerId:
|
||||
raise newException(CatchableError, "can't dial self!")
|
||||
@@ -136,7 +139,7 @@ proc internalConnect(
|
||||
trace "Reusing existing connection", conn, direction = $conn.dir
|
||||
return conn
|
||||
|
||||
conn = await self.dialAndUpgrade(peerId, addrs)
|
||||
conn = await self.dialAndUpgrade(peerId, addrs, forceDial)
|
||||
if isNil(conn): # None of the addresses connected
|
||||
raise newException(DialFailedError, "Unable to establish outgoing link")
|
||||
|
||||
@@ -159,7 +162,8 @@ proc internalConnect(
|
||||
method connect*(
|
||||
self: Dialer,
|
||||
peerId: PeerId,
|
||||
addrs: seq[MultiAddress]) {.async.} =
|
||||
addrs: seq[MultiAddress],
|
||||
forceDial = false) {.async.} =
|
||||
## connect remote peer without negotiating
|
||||
## a protocol
|
||||
##
|
||||
@@ -167,7 +171,7 @@ method connect*(
|
||||
if self.connManager.connCount(peerId) > 0:
|
||||
return
|
||||
|
||||
discard await self.internalConnect(peerId, addrs)
|
||||
discard await self.internalConnect(peerId, addrs, forceDial)
|
||||
|
||||
proc negotiateStream(
|
||||
self: Dialer,
|
||||
@@ -200,7 +204,8 @@ method dial*(
|
||||
self: Dialer,
|
||||
peerId: PeerId,
|
||||
addrs: seq[MultiAddress],
|
||||
protos: seq[string]): Future[Connection] {.async.} =
|
||||
protos: seq[string],
|
||||
forceDial = false): Future[Connection] {.async.} =
|
||||
## create a protocol stream and establish
|
||||
## a connection if one doesn't exist already
|
||||
##
|
||||
@@ -218,7 +223,7 @@ method dial*(
|
||||
|
||||
try:
|
||||
trace "Dialing (new)", peerId, protos
|
||||
conn = await self.internalConnect(peerId, addrs)
|
||||
conn = await self.internalConnect(peerId, addrs, forceDial)
|
||||
trace "Opening stream", conn
|
||||
stream = await self.connManager.getStream(conn)
|
||||
|
||||
|
||||
@@ -11,9 +11,9 @@
|
||||
|
||||
import std/[options, sequtils, hashes]
|
||||
import pkg/[chronos, chronicles, stew/results]
|
||||
import peerid, multiaddress, crypto/crypto, errors
|
||||
import peerid, multiaddress, crypto/crypto, routing_record, errors
|
||||
|
||||
export peerid, multiaddress, crypto, errors, results
|
||||
export peerid, multiaddress, crypto, routing_record, errors, results
|
||||
|
||||
## Our local peer info
|
||||
|
||||
@@ -28,6 +28,7 @@ type
|
||||
agentVersion*: string
|
||||
privateKey*: PrivateKey
|
||||
publicKey*: PublicKey
|
||||
signedPeerRecord*: Option[Envelope]
|
||||
|
||||
func shortLog*(p: PeerInfo): auto =
|
||||
(
|
||||
@@ -52,14 +53,26 @@ proc new*(
|
||||
key.getPublicKey().tryGet()
|
||||
except CatchableError:
|
||||
raise newException(PeerInfoError, "invalid private key")
|
||||
|
||||
let peerId = PeerID.init(key).tryGet()
|
||||
|
||||
let sprRes = SignedPeerRecord.init(
|
||||
key,
|
||||
PeerRecord.init(peerId, @addrs)
|
||||
)
|
||||
let spr = if sprRes.isOk:
|
||||
some(sprRes.get().envelope)
|
||||
else:
|
||||
none(Envelope)
|
||||
|
||||
let peerInfo = PeerInfo(
|
||||
peerId: PeerId.init(key).tryGet(),
|
||||
peerId: peerId,
|
||||
publicKey: pubkey,
|
||||
privateKey: key,
|
||||
protoVersion: protoVersion,
|
||||
agentVersion: agentVersion,
|
||||
addrs: @addrs,
|
||||
protocols: @protocols)
|
||||
protocols: @protocols,
|
||||
signedPeerRecord: spr)
|
||||
|
||||
return peerInfo
|
||||
|
||||
@@ -14,6 +14,7 @@ import
|
||||
./crypto/crypto,
|
||||
./protocols/identify,
|
||||
./peerid, ./peerinfo,
|
||||
./routing_record,
|
||||
./multiaddress
|
||||
|
||||
type
|
||||
@@ -53,6 +54,8 @@ type
|
||||
|
||||
agentBook*: PeerBook[string]
|
||||
protoVersionBook*: PeerBook[string]
|
||||
|
||||
signedPeerRecordBook*: PeerBook[Envelope]
|
||||
|
||||
## Constructs a new PeerStore with metadata of type M
|
||||
proc new*(T: type PeerStore): PeerStore =
|
||||
@@ -160,3 +163,6 @@ proc updatePeerInfo*(
|
||||
|
||||
if info.protos.len > 0:
|
||||
peerStore.protoBook.set(info.peerId, info.protos)
|
||||
|
||||
if info.signedPeerRecord.isSome:
|
||||
peerStore.signedPeerRecordBook.set(info.peerId, info.signedPeerRecord.get())
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import options
|
||||
import std/[sequtils, options, strutils, sugar]
|
||||
import chronos, chronicles
|
||||
import ../protobuf/minprotobuf,
|
||||
../peerinfo,
|
||||
@@ -44,9 +44,11 @@ type
|
||||
protoVersion*: Option[string]
|
||||
agentVersion*: Option[string]
|
||||
protos*: seq[string]
|
||||
signedPeerRecord*: Option[Envelope]
|
||||
|
||||
Identify* = ref object of LPProtocol
|
||||
peerInfo*: PeerInfo
|
||||
sendSignedPeerRecord*: bool
|
||||
|
||||
IdentifyPushHandler* = proc (
|
||||
peer: PeerId,
|
||||
@@ -57,8 +59,23 @@ type
|
||||
IdentifyPush* = ref object of LPProtocol
|
||||
identifyHandler: IdentifyPushHandler
|
||||
|
||||
proc encodeMsg*(peerInfo: PeerInfo, observedAddr: MultiAddress): ProtoBuffer
|
||||
{.raises: [Defect, IdentifyNoPubKeyError].} =
|
||||
chronicles.expandIt(IdentifyInfo):
|
||||
pubkey = ($it.pubkey).shortLog
|
||||
addresses = it.addrs.map(x => $x).join(",")
|
||||
protocols = it.protos.map(x => $x).join(",")
|
||||
observable_address =
|
||||
if it.observedAddr.isSome(): $it.observedAddr.get()
|
||||
else: "None"
|
||||
proto_version = it.protoVersion.get("None")
|
||||
agent_version = it.agentVersion.get("None")
|
||||
signedPeerRecord =
|
||||
# The SPR contains the same data as the identify message
|
||||
# would be cumbersome to log
|
||||
if iinfo.signedPeerRecord.isSome(): "Some"
|
||||
else: "None"
|
||||
|
||||
proc encodeMsg(peerInfo: PeerInfo, observedAddr: MultiAddress, sendSpr: bool): ProtoBuffer
|
||||
{.raises: [Defect].} =
|
||||
result = initProtoBuffer()
|
||||
|
||||
let pkey = peerInfo.publicKey
|
||||
@@ -76,6 +93,14 @@ proc encodeMsg*(peerInfo: PeerInfo, observedAddr: MultiAddress): ProtoBuffer
|
||||
else:
|
||||
peerInfo.agentVersion
|
||||
result.write(6, agentVersion)
|
||||
|
||||
## Optionally populate signedPeerRecord field.
|
||||
## See https://github.com/libp2p/go-libp2p/blob/ddf96ce1cfa9e19564feb9bd3e8269958bbc0aba/p2p/protocol/identify/pb/identify.proto for reference.
|
||||
if peerInfo.signedPeerRecord.isSome() and sendSpr:
|
||||
let sprBuff = peerInfo.signedPeerRecord.get().encode()
|
||||
if sprBuff.isOk():
|
||||
result.write(8, sprBuff.get())
|
||||
|
||||
result.finish()
|
||||
|
||||
proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] =
|
||||
@@ -85,6 +110,7 @@ proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] =
|
||||
oaddr: MultiAddress
|
||||
protoVersion: string
|
||||
agentVersion: string
|
||||
signedPeerRecord: SignedPeerRecord
|
||||
|
||||
var pb = initProtoBuffer(buf)
|
||||
|
||||
@@ -95,8 +121,11 @@ proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] =
|
||||
let r5 = pb.getField(5, protoVersion)
|
||||
let r6 = pb.getField(6, agentVersion)
|
||||
|
||||
let r8 = pb.getField(8, signedPeerRecord)
|
||||
|
||||
let res = r1.isOk() and r2.isOk() and r3.isOk() and
|
||||
r4.isOk() and r5.isOk() and r6.isOk()
|
||||
r4.isOk() and r5.isOk() and r6.isOk() and
|
||||
r8.isOk()
|
||||
|
||||
if res:
|
||||
if r1.get():
|
||||
@@ -107,18 +136,24 @@ proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] =
|
||||
iinfo.protoVersion = some(protoVersion)
|
||||
if r6.get():
|
||||
iinfo.agentVersion = some(agentVersion)
|
||||
debug "decodeMsg: decoded message", pubkey = ($pubkey).shortLog,
|
||||
addresses = $iinfo.addrs, protocols = $iinfo.protos,
|
||||
observable_address = $iinfo.observedAddr,
|
||||
proto_version = $iinfo.protoVersion,
|
||||
agent_version = $iinfo.agentVersion
|
||||
if r8.get() and r1.get():
|
||||
if iinfo.pubkey.get() == signedPeerRecord.envelope.publicKey:
|
||||
iinfo.signedPeerRecord = some(signedPeerRecord.envelope)
|
||||
debug "decodeMsg: decoded identify", iinfo
|
||||
some(iinfo)
|
||||
else:
|
||||
trace "decodeMsg: failed to decode received message"
|
||||
none[IdentifyInfo]()
|
||||
|
||||
proc new*(T: typedesc[Identify], peerInfo: PeerInfo): T =
|
||||
let identify = T(peerInfo: peerInfo)
|
||||
proc new*(
|
||||
T: typedesc[Identify],
|
||||
peerInfo: PeerInfo,
|
||||
sendSignedPeerRecord = false
|
||||
): T =
|
||||
let identify = T(
|
||||
peerInfo: peerInfo,
|
||||
sendSignedPeerRecord: sendSignedPeerRecord
|
||||
)
|
||||
identify.init()
|
||||
identify
|
||||
|
||||
@@ -126,7 +161,7 @@ method init*(p: Identify) =
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
try:
|
||||
trace "handling identify request", conn
|
||||
var pb = encodeMsg(p.peerInfo, conn.observedAddr)
|
||||
var pb = encodeMsg(p.peerInfo, conn.observedAddr, p.sendSignedPeerRecord)
|
||||
await conn.writeLp(pb.buffer)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
@@ -206,5 +241,5 @@ proc init*(p: IdentifyPush) =
|
||||
p.codec = IdentifyPushCodec
|
||||
|
||||
proc push*(p: IdentifyPush, peerInfo: PeerInfo, conn: Connection) {.async.} =
|
||||
var pb = encodeMsg(peerInfo, conn.observedAddr)
|
||||
var pb = encodeMsg(peerInfo, conn.observedAddr, true)
|
||||
await conn.writeLp(pb.buffer)
|
||||
|
||||
6
libp2p/protocols/pubsub/errors.nim
Normal file
6
libp2p/protocols/pubsub/errors.nim
Normal file
@@ -0,0 +1,6 @@
|
||||
# this module will be further extended in PR
|
||||
# https://github.com/status-im/nim-libp2p/pull/107/
|
||||
|
||||
type
|
||||
ValidationResult* {.pure.} = enum
|
||||
Accept, Reject, Ignore
|
||||
@@ -96,7 +96,14 @@ method rpcHandler*(f: FloodSub,
|
||||
f.handleSubscribe(peer, sub.topic, sub.subscribe)
|
||||
|
||||
for msg in rpcMsg.messages: # for every message
|
||||
let msgId = f.msgIdProvider(msg)
|
||||
let msgIdResult = f.msgIdProvider(msg)
|
||||
if msgIdResult.isErr:
|
||||
debug "Dropping message due to failed message id generation",
|
||||
error = msgIdResult.error
|
||||
# TODO: descore peers due to error during message validation (malicious?)
|
||||
continue
|
||||
|
||||
let msgId = msgIdResult.get
|
||||
|
||||
if f.addSeen(msgId):
|
||||
trace "Dropping already-seen message", msgId, peer
|
||||
@@ -184,7 +191,14 @@ method publish*(f: FloodSub,
|
||||
Message.init(none(PeerInfo), data, topic, none(uint64), false)
|
||||
else:
|
||||
Message.init(some(f.peerInfo), data, topic, some(f.msgSeqno), f.sign)
|
||||
msgId = f.msgIdProvider(msg)
|
||||
msgIdResult = f.msgIdProvider(msg)
|
||||
|
||||
if msgIdResult.isErr:
|
||||
trace "Error generating message id, skipping publish",
|
||||
error = msgIdResult.error
|
||||
return 0
|
||||
|
||||
let msgId = msgIdResult.get
|
||||
|
||||
trace "Created new message",
|
||||
msg = shortLog(msg), peers = peers.len, topic, msgId
|
||||
|
||||
@@ -186,16 +186,16 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
|
||||
if s[].len == 0:
|
||||
g.peersInIP.del(pubSubPeer.address.get())
|
||||
|
||||
for t in toSeq(g.gossipsub.keys):
|
||||
g.gossipsub.removePeer(t, pubSubPeer)
|
||||
# also try to remove from explicit table here
|
||||
g.explicit.removePeer(t, pubSubPeer)
|
||||
|
||||
for t in toSeq(g.mesh.keys):
|
||||
trace "pruning unsubscribing peer", pubSubPeer, score = pubSubPeer.score
|
||||
g.pruned(pubSubPeer, t)
|
||||
g.mesh.removePeer(t, pubSubPeer)
|
||||
|
||||
for t in toSeq(g.gossipsub.keys):
|
||||
g.gossipsub.removePeer(t, pubSubPeer)
|
||||
# also try to remove from explicit table here
|
||||
g.explicit.removePeer(t, pubSubPeer)
|
||||
|
||||
for t in toSeq(g.fanout.keys):
|
||||
g.fanout.removePeer(t, pubSubPeer)
|
||||
|
||||
@@ -237,9 +237,14 @@ proc handleSubscribe*(g: GossipSub,
|
||||
else:
|
||||
trace "peer unsubscribed from topic"
|
||||
|
||||
if g.mesh.hasPeer(topic, peer):
|
||||
#against spec
|
||||
g.mesh.removePeer(topic, peer)
|
||||
g.pruned(peer, topic)
|
||||
|
||||
# unsubscribe remote peer from the topic
|
||||
g.gossipsub.removePeer(topic, peer)
|
||||
g.mesh.removePeer(topic, peer)
|
||||
|
||||
g.fanout.removePeer(topic, peer)
|
||||
if peer.peerId in g.parameters.directPeers:
|
||||
g.explicit.removePeer(topic, peer)
|
||||
@@ -357,8 +362,16 @@ method rpcHandler*(g: GossipSub,
|
||||
|
||||
for i in 0..<rpcMsg.messages.len(): # for every message
|
||||
template msg: untyped = rpcMsg.messages[i]
|
||||
let msgIdResult = g.msgIdProvider(msg)
|
||||
|
||||
if msgIdResult.isErr:
|
||||
debug "Dropping message due to failed message id generation",
|
||||
error = msgIdResult.error
|
||||
# TODO: descore peers due to error during message validation (malicious?)
|
||||
continue
|
||||
|
||||
let
|
||||
msgId = g.msgIdProvider(msg)
|
||||
msgId = msgIdResult.get
|
||||
msgIdSalted = msgId & g.seenSalt
|
||||
|
||||
# addSeen adds salt to msgId to avoid
|
||||
@@ -469,13 +482,23 @@ method publish*(g: GossipSub,
|
||||
|
||||
if topic in g.topics: # if we're subscribed use the mesh
|
||||
peers.incl(g.mesh.getOrDefault(topic))
|
||||
else: # not subscribed, send to fanout peers
|
||||
# try optimistically
|
||||
peers.incl(g.fanout.getOrDefault(topic))
|
||||
if peers.len == 0:
|
||||
# ok we had nothing.. let's try replenish inline
|
||||
|
||||
if peers.len < g.parameters.dLow and g.parameters.floodPublish == false:
|
||||
# not subscribed or bad mesh, send to fanout peers
|
||||
# disable for floodPublish, since we already sent to every good peer
|
||||
#
|
||||
var fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
|
||||
if fanoutPeers.len == 0:
|
||||
g.replenishFanout(topic)
|
||||
peers.incl(g.fanout.getOrDefault(topic))
|
||||
fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
|
||||
|
||||
g.rng.shuffle(fanoutPeers)
|
||||
if fanoutPeers.len + peers.len > g.parameters.d:
|
||||
fanoutPeers.setLen(g.parameters.d - peers.len)
|
||||
|
||||
for fanPeer in fanoutPeers:
|
||||
peers.incl(fanPeer)
|
||||
if peers.len > g.parameters.d: break
|
||||
|
||||
# even if we couldn't publish,
|
||||
# we still attempted to publish
|
||||
@@ -500,7 +523,15 @@ method publish*(g: GossipSub,
|
||||
Message.init(none(PeerInfo), data, topic, none(uint64), false)
|
||||
else:
|
||||
Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign)
|
||||
msgId = g.msgIdProvider(msg)
|
||||
msgIdResult = g.msgIdProvider(msg)
|
||||
|
||||
if msgIdResult.isErr:
|
||||
trace "Error generating message id, skipping publish",
|
||||
error = msgIdResult.error
|
||||
libp2p_gossipsub_failed_publish.inc()
|
||||
return 0
|
||||
|
||||
let msgId = msgIdResult.get
|
||||
|
||||
logScope: msgId = shortLog(msgId)
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ import chronos, chronicles, metrics
|
||||
import "."/[types, scoring]
|
||||
import ".."/[pubsubpeer, peertable, timedcache, mcache, floodsub, pubsub]
|
||||
import "../rpc"/[messages]
|
||||
import "../../.."/[peerid, multiaddress, utility, switch]
|
||||
import "../../.."/[peerid, multiaddress, utility, switch, routing_record, signed_envelope]
|
||||
|
||||
declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache")
|
||||
declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"])
|
||||
@@ -83,8 +83,16 @@ proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises:
|
||||
x.score >= 0.0
|
||||
# by spec, larger then Dhi, but let's put some hard caps
|
||||
peers.setLen(min(peers.len, g.parameters.dHigh * 2))
|
||||
let sprBook = g.switch.peerStore.signedPeerRecordBook
|
||||
peers.map do (x: PubSubPeer) -> PeerInfoMsg:
|
||||
PeerInfoMsg(peerId: x.peerId.getBytes())
|
||||
PeerInfoMsg(
|
||||
peerId: x.peerId,
|
||||
signedPeerRecord:
|
||||
if x.peerId in sprBook:
|
||||
sprBook.get(x.peerId).encode().get(default(seq[byte]))
|
||||
else:
|
||||
default(seq[byte])
|
||||
)
|
||||
|
||||
proc handleGraft*(g: GossipSub,
|
||||
peer: PubSubPeer,
|
||||
@@ -165,6 +173,29 @@ proc handleGraft*(g: GossipSub,
|
||||
|
||||
return prunes
|
||||
|
||||
proc getPeers(prune: ControlPrune, peer: PubSubPeer): seq[(PeerId, Option[PeerRecord])] =
|
||||
var routingRecords: seq[(PeerId, Option[PeerRecord])]
|
||||
for record in prune.peers:
|
||||
let peerRecord =
|
||||
if record.signedPeerRecord.len == 0:
|
||||
none(PeerRecord)
|
||||
else:
|
||||
let signedRecord = SignedPeerRecord.decode(record.signedPeerRecord)
|
||||
if signedRecord.isErr:
|
||||
trace "peer sent invalid SPR", peer, error=signedRecord.error
|
||||
none(PeerRecord)
|
||||
else:
|
||||
if record.peerID != signedRecord.get().data.peerId:
|
||||
trace "peer sent envelope with wrong public key", peer
|
||||
none(PeerRecord)
|
||||
else:
|
||||
some(signedRecord.get().data)
|
||||
|
||||
routingRecords.add((record.peerId, peerRecord))
|
||||
|
||||
routingRecords
|
||||
|
||||
|
||||
proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [Defect].} =
|
||||
for prune in prunes:
|
||||
let topic = prune.topicID
|
||||
@@ -190,9 +221,12 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.r
|
||||
g.pruned(peer, topic, setBackoff = false)
|
||||
g.mesh.removePeer(topic, peer)
|
||||
|
||||
# TODO peer exchange, we miss ambient peer discovery in libp2p, so we are blocked by that
|
||||
# another option could be to implement signed peer records
|
||||
## if peer.score > g.parameters.gossipThreshold and prunes.peers.len > 0:
|
||||
if peer.score > g.parameters.gossipThreshold and prune.peers.len > 0 and
|
||||
g.routingRecordsHandler.len > 0:
|
||||
let routingRecords = prune.getPeers(peer)
|
||||
|
||||
for handler in g.routingRecordsHandler:
|
||||
handler(peer.peerId, topic, routingRecords)
|
||||
|
||||
proc handleIHave*(g: GossipSub,
|
||||
peer: PubSubPeer,
|
||||
@@ -489,9 +523,11 @@ proc replenishFanout*(g: GossipSub, topic: string) {.raises: [Defect].} =
|
||||
logScope: topic
|
||||
trace "about to replenish fanout"
|
||||
|
||||
let currentMesh = g.mesh.getOrDefault(topic)
|
||||
if g.fanout.peers(topic) < g.parameters.dLow:
|
||||
trace "replenishing fanout", peers = g.fanout.peers(topic)
|
||||
for peer in g.gossipsub.getOrDefault(topic):
|
||||
if peer in currentMesh: continue
|
||||
if g.fanout.addPeer(topic, peer):
|
||||
if g.fanout.peers(topic) == g.parameters.d:
|
||||
break
|
||||
|
||||
@@ -142,6 +142,13 @@ type
|
||||
BackoffTable* = Table[string, Table[PeerId, Moment]]
|
||||
ValidationSeenTable* = Table[MessageID, HashSet[PubSubPeer]]
|
||||
|
||||
RoutingRecordsPair* = tuple[id: PeerId, record: Option[PeerRecord]]
|
||||
RoutingRecordsHandler* =
|
||||
proc(peer: PeerId,
|
||||
tag: string, # For gossipsub, the topic
|
||||
peers: seq[RoutingRecordsPair])
|
||||
{.gcsafe, raises: [Defect].}
|
||||
|
||||
GossipSub* = ref object of FloodSub
|
||||
mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic
|
||||
fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic
|
||||
@@ -153,7 +160,7 @@ type
|
||||
control*: Table[string, ControlMessage] # pending control messages
|
||||
mcache*: MCache # messages cache
|
||||
validationSeen*: ValidationSeenTable # peers who sent us message in validation
|
||||
heartbeatFut*: Future[void] # cancellation future for heartbeat interval
|
||||
heartbeatFut*: Future[void] # cancellation future for heartbeat interval
|
||||
heartbeatRunning*: bool
|
||||
|
||||
peerStats*: Table[PeerId, PeerStats]
|
||||
@@ -161,6 +168,7 @@ type
|
||||
topicParams*: Table[string, TopicParams]
|
||||
directPeersLoop*: Future[void]
|
||||
peersInIP*: Table[MultiAddress, HashSet[PeerId]]
|
||||
routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange
|
||||
|
||||
heartbeatEvents*: seq[AsyncEvent]
|
||||
|
||||
|
||||
@@ -11,7 +11,8 @@
|
||||
|
||||
import std/[tables, sequtils, sets, strutils]
|
||||
import chronos, chronicles, metrics, bearssl
|
||||
import ./pubsubpeer,
|
||||
import ./errors as pubsub_errors,
|
||||
./pubsubpeer,
|
||||
./rpc/[message, messages, protobuf],
|
||||
../../switch,
|
||||
../protocol,
|
||||
@@ -29,6 +30,7 @@ export results
|
||||
export PubSubPeer
|
||||
export PubSubObserver
|
||||
export protocol
|
||||
export pubsub_errors
|
||||
|
||||
logScope:
|
||||
topics = "libp2p pubsub"
|
||||
@@ -76,16 +78,13 @@ type
|
||||
TopicHandler* = proc(topic: string,
|
||||
data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].}
|
||||
|
||||
ValidationResult* {.pure.} = enum
|
||||
Accept, Reject, Ignore
|
||||
|
||||
ValidatorHandler* = proc(topic: string,
|
||||
message: Message): Future[ValidationResult] {.gcsafe, raises: [Defect].}
|
||||
|
||||
TopicPair* = tuple[topic: string, handler: TopicHandler]
|
||||
|
||||
MsgIdProvider* =
|
||||
proc(m: Message): MessageID {.noSideEffect, raises: [Defect], gcsafe.}
|
||||
proc(m: Message): Result[MessageID, ValidationResult] {.noSideEffect, raises: [Defect], gcsafe.}
|
||||
|
||||
SubscriptionValidator* =
|
||||
proc(topic: string): bool {.raises: [Defect], gcsafe.}
|
||||
@@ -452,6 +451,11 @@ proc subscribe*(p: PubSub,
|
||||
## on every received message
|
||||
##
|
||||
|
||||
# Check that this is an allowed topic
|
||||
if p.subscriptionValidator != nil and p.subscriptionValidator(topic) == false:
|
||||
warn "Trying to subscribe to a topic not passing validation!", topic
|
||||
return
|
||||
|
||||
p.topics.withValue(topic, handlers) do:
|
||||
# Already subscribed, just adding another handler
|
||||
handlers[].add(handler)
|
||||
|
||||
@@ -16,9 +16,10 @@ import ./messages,
|
||||
../../../peerid,
|
||||
../../../peerinfo,
|
||||
../../../crypto/crypto,
|
||||
../../../protobuf/minprotobuf
|
||||
../../../protobuf/minprotobuf,
|
||||
../../../protocols/pubsub/errors
|
||||
|
||||
export messages
|
||||
export errors, messages
|
||||
|
||||
logScope:
|
||||
topics = "pubsubmessage"
|
||||
@@ -28,16 +29,12 @@ const PubSubPrefix = toBytes("libp2p-pubsub:")
|
||||
declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated messages")
|
||||
declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages")
|
||||
|
||||
func defaultMsgIdProvider*(m: Message): MessageID =
|
||||
let mid =
|
||||
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
|
||||
byteutils.toHex(m.seqno) & $m.fromPeer
|
||||
else:
|
||||
# This part is irrelevant because it's not standard,
|
||||
# We use it exclusively for testing basically and users should
|
||||
# implement their own logic in the case they use anonymization
|
||||
$m.data.hash & $m.topicIDs.hash
|
||||
mid.toBytes()
|
||||
func defaultMsgIdProvider*(m: Message): Result[MessageID, ValidationResult] =
|
||||
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
|
||||
let mid = byteutils.toHex(m.seqno) & $m.fromPeer
|
||||
ok mid.toBytes()
|
||||
else:
|
||||
err ValidationResult.Reject
|
||||
|
||||
proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] =
|
||||
ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg, false))).getBytes())
|
||||
|
||||
@@ -10,14 +10,17 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import options, sequtils
|
||||
import ../../../utility
|
||||
import ../../../peerid
|
||||
import "../../.."/[
|
||||
peerid,
|
||||
routing_record,
|
||||
utility
|
||||
]
|
||||
|
||||
export options
|
||||
|
||||
type
|
||||
PeerInfoMsg* = object
|
||||
peerId*: seq[byte]
|
||||
peerId*: PeerId
|
||||
signedPeerRecord*: seq[byte]
|
||||
|
||||
SubOpts* = object
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import std/[sequtils, times]
|
||||
import std/[sequtils, times, sugar]
|
||||
import pkg/stew/[results, byteutils]
|
||||
import
|
||||
multiaddress,
|
||||
@@ -22,11 +22,6 @@ import
|
||||
|
||||
export peerid, multiaddress, signed_envelope
|
||||
|
||||
## Constants relating to signed peer records
|
||||
const
|
||||
EnvelopeDomain = multiCodec("libp2p-peer-record") # envelope domain as per RFC0002
|
||||
EnvelopePayloadType= @[(byte) 0x03, (byte) 0x01] # payload_type for routing records as spec'ed in RFC0003
|
||||
|
||||
type
|
||||
AddressInfo* = object
|
||||
address*: MultiAddress
|
||||
@@ -76,8 +71,9 @@ proc encode*(record: PeerRecord): seq[byte] =
|
||||
|
||||
proc init*(T: typedesc[PeerRecord],
|
||||
peerId: PeerId,
|
||||
seqNo: uint64,
|
||||
addresses: seq[MultiAddress]): T =
|
||||
addresses: seq[MultiAddress],
|
||||
seqNo = getTime().toUnix().uint64 # follows the recommended implementation, using unix epoch as seq no.
|
||||
): T =
|
||||
|
||||
PeerRecord(
|
||||
peerId: peerId,
|
||||
@@ -87,39 +83,13 @@ proc init*(T: typedesc[PeerRecord],
|
||||
|
||||
|
||||
## Functions related to signed peer records
|
||||
type SignedPeerRecord* = SignedPayload[PeerRecord]
|
||||
|
||||
proc init*(T: typedesc[Envelope],
|
||||
privateKey: PrivateKey,
|
||||
peerRecord: PeerRecord): Result[Envelope, CryptoError] =
|
||||
|
||||
## Init a signed envelope wrapping a peer record
|
||||
proc payloadDomain*(T: typedesc[PeerRecord]): string = $multiCodec("libp2p-peer-record")
|
||||
proc payloadType*(T: typedesc[PeerRecord]): seq[byte] = @[(byte) 0x03, (byte) 0x01]
|
||||
|
||||
let envelope = ? Envelope.init(privateKey,
|
||||
EnvelopePayloadType,
|
||||
peerRecord.encode(),
|
||||
$EnvelopeDomain)
|
||||
|
||||
ok(envelope)
|
||||
|
||||
proc init*(T: typedesc[Envelope],
|
||||
peerId: PeerId,
|
||||
addresses: seq[MultiAddress],
|
||||
privateKey: PrivateKey): Result[Envelope, CryptoError] =
|
||||
## Creates a signed peer record for this peer:
|
||||
## a peer routing record according to https://github.com/libp2p/specs/blob/500a7906dd7dd8f64e0af38de010ef7551fd61b6/RFC/0003-routing-records.md
|
||||
## in a signed envelope according to https://github.com/libp2p/specs/blob/500a7906dd7dd8f64e0af38de010ef7551fd61b6/RFC/0002-signed-envelopes.md
|
||||
|
||||
# First create a peer record from the peer info
|
||||
let peerRecord = PeerRecord.init(peerId,
|
||||
getTime().toUnix().uint64, # This currently follows the recommended implementation, using unix epoch as seq no.
|
||||
addresses)
|
||||
|
||||
let envelope = ? Envelope.init(privateKey,
|
||||
peerRecord)
|
||||
|
||||
ok(envelope)
|
||||
|
||||
proc getSignedPeerRecord*(pb: ProtoBuffer, field: int,
|
||||
value: var Envelope): ProtoResult[bool] {.
|
||||
inline.} =
|
||||
getField(pb, field, value, $EnvelopeDomain)
|
||||
proc checkValid*(spr: SignedPeerRecord): Result[void, EnvelopeError] =
|
||||
if not spr.data.peerId.match(spr.envelope.publicKey):
|
||||
err(EnvelopeInvalidSignature)
|
||||
else:
|
||||
ok()
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import std/sugar
|
||||
import pkg/stew/[results, byteutils]
|
||||
import multicodec,
|
||||
crypto/crypto,
|
||||
@@ -23,7 +24,8 @@ type
|
||||
EnvelopeError* = enum
|
||||
EnvelopeInvalidProtobuf,
|
||||
EnvelopeFieldMissing,
|
||||
EnvelopeInvalidSignature
|
||||
EnvelopeInvalidSignature,
|
||||
EnvelopeWrongType
|
||||
|
||||
Envelope* = object
|
||||
publicKey*: PublicKey
|
||||
@@ -116,3 +118,52 @@ proc getField*(pb: ProtoBuffer, field: int,
|
||||
ok(true)
|
||||
else:
|
||||
err(ProtoError.IncorrectBlob)
|
||||
|
||||
type
|
||||
SignedPayload*[T] = object
|
||||
# T needs to have .encode(), .decode(), .payloadType(), .domain()
|
||||
envelope*: Envelope
|
||||
data*: T
|
||||
|
||||
proc init*[T](_: typedesc[SignedPayload[T]],
|
||||
privateKey: PrivateKey,
|
||||
data: T): Result[SignedPayload[T], CryptoError] =
|
||||
mixin encode
|
||||
|
||||
let envelope = ? Envelope.init(privateKey,
|
||||
T.payloadType(),
|
||||
data.encode(),
|
||||
T.payloadDomain)
|
||||
|
||||
ok(SignedPayload[T](data: data, envelope: envelope))
|
||||
|
||||
proc getField*[T](pb: ProtoBuffer, field: int,
|
||||
value: var SignedPayload[T]): ProtoResult[bool] {.
|
||||
inline.} =
|
||||
if not ? getField(pb, field, value.envelope, T.payloadDomain):
|
||||
ok(false)
|
||||
else:
|
||||
mixin decode
|
||||
value.data = ? T.decode(value.envelope.payload).mapErr(x => ProtoError.IncorrectBlob)
|
||||
ok(true)
|
||||
|
||||
proc decode*[T](
|
||||
_: typedesc[SignedPayload[T]],
|
||||
buffer: seq[byte]
|
||||
): Result[SignedPayload[T], EnvelopeError] =
|
||||
|
||||
let
|
||||
envelope = ? Envelope.decode(buffer, T.payloadDomain)
|
||||
data = ? T.decode(envelope.payload).mapErr(x => EnvelopeInvalidProtobuf)
|
||||
signedPayload = SignedPayload[T](envelope: envelope, data: data)
|
||||
|
||||
if envelope.payloadType != T.payloadType:
|
||||
return err(EnvelopeWrongType)
|
||||
|
||||
when compiles(? signedPayload.checkValid()):
|
||||
? signedPayload.checkValid()
|
||||
|
||||
ok(signedPayload)
|
||||
|
||||
proc encode*[T](msg: SignedPayload[T]): Result[seq[byte], CryptoError] =
|
||||
msg.envelope.encode()
|
||||
|
||||
225
libp2p/stream/ringbuffer.nim
Normal file
225
libp2p/stream/ringbuffer.nim
Normal file
@@ -0,0 +1,225 @@
|
||||
## Nim-LibP2P
|
||||
## Copyright (c) 2021 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
# Could be in chronos
|
||||
|
||||
import std/[strformat, sequtils]
|
||||
import stew/byteutils
|
||||
import chronos, chronicles
|
||||
|
||||
## Sync part
|
||||
##
|
||||
## Ring buffer where writep == readp means empty
|
||||
## and writep + & == readp means full.
|
||||
## This wastes a slot of the buf that will never be used
|
||||
##
|
||||
## Instead of doing clever modulos, ifs are used for clarity
|
||||
type
|
||||
RingBuffer*[T] = object
|
||||
buf: seq[T] # Data store
|
||||
readp: int # Reading position
|
||||
writep: int # Writing position
|
||||
full: bool
|
||||
|
||||
func isFull*[T](rb: RingBuffer[T]): bool =
|
||||
rb.full
|
||||
|
||||
func isEmpty*[T](rb: RingBuffer[T]): bool =
|
||||
rb.writep == rb.readp and not rb.full
|
||||
|
||||
func len*[T](rb: RingBuffer[T]): int =
|
||||
if rb.full: rb.buf.len
|
||||
elif rb.writep < rb.readp: rb.writep + rb.buf.len - rb.readp
|
||||
else: rb.writep - rb.readp
|
||||
|
||||
proc write*[T](rb: var RingBuffer[T], data: openArray[T]): int =
|
||||
## Add data
|
||||
|
||||
let
|
||||
readPosRelative =
|
||||
if rb.readp < rb.writep: rb.readp + rb.buf.len
|
||||
else: rb.readp
|
||||
|
||||
canAddBeforeFull =
|
||||
if rb.isEmpty: min(rb.buf.len, data.len)
|
||||
else: min(readPosRelative - rb.writep, data.len)
|
||||
|
||||
canAddBeforeSplit = min(rb.buf.len - rb.writep, data.len)
|
||||
|
||||
canAdd = min(canAddBeforeFull, canAddBeforeSplit)
|
||||
|
||||
copyMem(addr rb.buf[rb.writep], unsafeAddr data[0], canAdd)
|
||||
rb.writep += canAdd
|
||||
|
||||
if canAdd < canAddBeforeFull:
|
||||
let splittedCount = canAddBeforeFull - canAddBeforeSplit
|
||||
copyMem(addr rb.buf[0], unsafeAddr data[canAdd], splittedCount)
|
||||
rb.writep = splittedCount
|
||||
|
||||
if rb.writeP == rb.buf.len: rb.writeP = 0
|
||||
|
||||
if rb.writeP == rb.readP: rb.full = true
|
||||
|
||||
canAddBeforeFull
|
||||
|
||||
proc read_internal[T](rb: var RingBuffer[T], buffer: var openArray[T]): int =
|
||||
## This will chunk at the end of the circular buffer
|
||||
## to avoid allocations
|
||||
let
|
||||
continuousInBuffer =
|
||||
if rb.writep < rb.readp or rb.full: rb.buf.len - rb.readp
|
||||
else: rb.writep - rb.readp
|
||||
canRead = min(buffer.len, continuousInBuffer)
|
||||
|
||||
copyMem(unsafeAddr buffer[0], addr rb.buf[rb.readp], canRead)
|
||||
rb.readp += canRead
|
||||
if rb.readp == rb.buf.len: rb.readp = 0
|
||||
rb.full = false
|
||||
canRead
|
||||
|
||||
proc read*[T](rb: var RingBuffer[T], buffer: var openArray[T]): int =
|
||||
rb.read_internal(buffer)
|
||||
|
||||
proc readAll*[T](rb: var RingBuffer[T]): seq[T] =
|
||||
result = newSeq[T](rb.len())
|
||||
let readFirst = rb.read_internal(result)
|
||||
if readFirst < result.len:
|
||||
discard rb.read_internal(result.toOpenArray(readFirst, result.len - 1))
|
||||
|
||||
proc init*[T](R: typedesc[RingBuffer[T]], size: int): R =
|
||||
RingBuffer[T](
|
||||
buf: newSeq[T](size)
|
||||
)
|
||||
|
||||
when isMainModule:
|
||||
var totalI = 0
|
||||
proc getSeq(i: int): seq[byte] =
|
||||
for x in 0..<i: result.add((totalI + x).byte)
|
||||
totalI.inc(i)
|
||||
#var rb = RingBuffer[byte].init(12)
|
||||
#var buffer = newSeq[byte](30)
|
||||
#echo "Sync"
|
||||
#echo rb.write(getSeq(3))
|
||||
#echo rb.write(getSeq(6))
|
||||
#echo rb.readAll()
|
||||
#echo rb.write(getSeq(6))
|
||||
#echo rb
|
||||
#echo rb.write(getSeq(6))
|
||||
#echo rb.readAll()
|
||||
|
||||
#echo rb.write(getSeq(30))
|
||||
#echo rb
|
||||
|
||||
## Async part
|
||||
type
|
||||
AsyncRingBuffer*[T] = ref object
|
||||
ring: RingBuffer[T]
|
||||
#TODO handle a single getter
|
||||
getters: seq[Future[void]]
|
||||
putters: seq[Future[void]]
|
||||
|
||||
# Borrows
|
||||
func isFull*[T](rb: AsyncRingBuffer[T]): bool = rb.ring.isFull()
|
||||
func isEmpty*[T](rb: AsyncRingBuffer[T]): bool = rb.ring.isEmpty()
|
||||
func len*[T](rb: AsyncRingBuffer[T]): int = rb.ring.len()
|
||||
|
||||
# Stolen from chronos
|
||||
proc wakeupNext(waiters: var seq[Future[void]]) {.inline.} =
|
||||
var i = 0
|
||||
while i < len(waiters):
|
||||
var waiter = waiters[i]
|
||||
inc(i)
|
||||
|
||||
if not(waiter.finished()):
|
||||
waiter.complete()
|
||||
break
|
||||
|
||||
if i > 0:
|
||||
waiters.delete(0, i - 1)
|
||||
|
||||
#proc write*[T](rb: AsyncRingBuffer[T], data: seq[T]) {.async.} =
|
||||
# # First, get a slot
|
||||
# while rb.isFull():
|
||||
# var putter = newFuture[void]("RingBuffer.write")
|
||||
# rb.putters.add(putter)
|
||||
# await putter
|
||||
#
|
||||
# # Now we must write everything without getting our slot stollen
|
||||
# var written = rb.ring.write(data)
|
||||
# rb.getters.wakeUpNext()
|
||||
#
|
||||
# while written < data.len:
|
||||
# var putter = newFuture[void]("RingBuffer.write")
|
||||
# # We are prioritary
|
||||
# rb.putters.insert(putter, 0)
|
||||
# await putter
|
||||
#
|
||||
# written += rb.ring.write(data.toOpenArray(written, data.len - 1))
|
||||
# rb.getters.wakeUpNext()
|
||||
#
|
||||
# if not rb.isFull(): #Room for the next one
|
||||
# rb.putters.wakeUpNext()
|
||||
|
||||
proc write*[T](rb: AsyncRingBuffer[T], data: ptr UncheckedArray[T], size: int) {.async.} =
|
||||
# First, get a slot
|
||||
while rb.isFull():
|
||||
var putter = newFuture[void]("RingBuffer.write")
|
||||
rb.putters.add(putter)
|
||||
await putter
|
||||
|
||||
# Now we must write everything without getting our slot stollen
|
||||
var written = rb.ring.write(toOpenArray(data, 0, size))
|
||||
rb.getters.wakeUpNext()
|
||||
|
||||
while written < size:
|
||||
var putter = newFuture[void]("RingBuffer.write")
|
||||
# We are prioritary
|
||||
rb.putters.insert(putter, 0)
|
||||
await putter
|
||||
|
||||
written += rb.ring.write(data.toOpenArray(written, size - 1))
|
||||
rb.getters.wakeUpNext()
|
||||
|
||||
if not rb.isFull(): #Room for the next one
|
||||
rb.putters.wakeUpNext()
|
||||
|
||||
proc read*[T](rb: AsyncRingBuffer[T], maxRead: int = 10000): Future[seq[T]] {.async.} =
|
||||
while rb.isEmpty():
|
||||
var getter = newFuture[void]("RingBuffer.read")
|
||||
rb.getters.add(getter)
|
||||
await getter
|
||||
|
||||
let res = rb.ring.readAll()
|
||||
|
||||
rb.putters.wakeUpNext()
|
||||
# Since we read everything, we won't wake up other readers
|
||||
|
||||
return res
|
||||
|
||||
proc new*[T](R: typedesc[AsyncRingBuffer[T]], size: int): R =
|
||||
AsyncRingBuffer[T](
|
||||
ring: RingBuffer[T].init(size)
|
||||
)
|
||||
|
||||
when isMainModule:
|
||||
proc testA {.async.} =
|
||||
var rb = AsyncRingBuffer[byte].new(3)
|
||||
#await rb.write(getSeq(6))
|
||||
let toSend = getSeq(30)
|
||||
let tooBigWrite = rb.write(cast[ptr UncheckedArray[byte]](unsafeAddr toSend[0]), 30)# and rb.write(UncheckedArray(getSeq(70)), 70)
|
||||
while rb.len > 0 or not tooBigWrite.finished:
|
||||
let reader = rb.read()
|
||||
await reader or tooBigWrite
|
||||
if reader.finished:
|
||||
echo await reader
|
||||
else:
|
||||
await reader.cancelAndWait()
|
||||
|
||||
echo "Async"
|
||||
waitFor(testA())
|
||||
@@ -99,8 +99,9 @@ proc disconnect*(s: Switch, peerId: PeerId): Future[void] {.gcsafe.} =
|
||||
method connect*(
|
||||
s: Switch,
|
||||
peerId: PeerId,
|
||||
addrs: seq[MultiAddress]): Future[void] =
|
||||
s.dialer.connect(peerId, addrs)
|
||||
addrs: seq[MultiAddress],
|
||||
forceDial = false): Future[void] =
|
||||
s.dialer.connect(peerId, addrs, forceDial)
|
||||
|
||||
method dial*(
|
||||
s: Switch,
|
||||
@@ -117,8 +118,9 @@ method dial*(
|
||||
s: Switch,
|
||||
peerId: PeerId,
|
||||
addrs: seq[MultiAddress],
|
||||
protos: seq[string]): Future[Connection] =
|
||||
s.dialer.dial(peerId, addrs, protos)
|
||||
protos: seq[string],
|
||||
forceDial = false): Future[Connection] =
|
||||
s.dialer.dial(peerId, addrs, protos, forceDial)
|
||||
|
||||
proc dial*(
|
||||
s: Switch,
|
||||
|
||||
@@ -30,6 +30,8 @@ export transport, websock
|
||||
const
|
||||
WsTransportTrackerName* = "libp2p.wstransport"
|
||||
|
||||
DefaultHeadersTimeout = 3.seconds
|
||||
|
||||
type
|
||||
WsStream = ref object of Connection
|
||||
session: WSSession
|
||||
@@ -49,11 +51,24 @@ proc new*(T: type WsStream,
|
||||
stream.initStream()
|
||||
return stream
|
||||
|
||||
template mapExceptions(body: untyped) =
|
||||
try:
|
||||
body
|
||||
except AsyncStreamIncompleteError:
|
||||
raise newLPStreamEOFError()
|
||||
except AsyncStreamUseClosedError:
|
||||
raise newLPStreamEOFError()
|
||||
except WSClosedError:
|
||||
raise newLPStreamEOFError()
|
||||
except AsyncStreamLimitError:
|
||||
raise newLPStreamLimitError()
|
||||
|
||||
method readOnce*(
|
||||
s: WsStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int): Future[int] {.async.} =
|
||||
let res = await s.session.recv(pbytes, nbytes)
|
||||
let res = mapExceptions(await s.session.recv(pbytes, nbytes))
|
||||
|
||||
if res == 0 and s.session.readyState == ReadyState.Closed:
|
||||
raise newLPStreamEOFError()
|
||||
return res
|
||||
@@ -61,10 +76,7 @@ method readOnce*(
|
||||
method write*(
|
||||
s: WsStream,
|
||||
msg: seq[byte]): Future[void] {.async.} =
|
||||
try:
|
||||
await s.session.send(msg, Opcode.Binary)
|
||||
except WSClosedError:
|
||||
raise newLPStreamEOFError()
|
||||
mapExceptions(await s.session.send(msg, Opcode.Binary))
|
||||
|
||||
method closeImpl*(s: WsStream): Future[void] {.async.} =
|
||||
await s.session.close()
|
||||
@@ -82,6 +94,7 @@ type
|
||||
tlsCertificate: TLSCertificate
|
||||
tlsFlags: set[TLSFlags]
|
||||
flags: set[ServerFlags]
|
||||
handshakeTimeout: Duration
|
||||
factories: seq[ExtFactory]
|
||||
rng: Rng
|
||||
|
||||
@@ -121,9 +134,13 @@ method start*(
|
||||
address = ma.initTAddress().tryGet(),
|
||||
tlsPrivateKey = self.tlsPrivateKey,
|
||||
tlsCertificate = self.tlsCertificate,
|
||||
flags = self.flags)
|
||||
flags = self.flags,
|
||||
handshakeTimeout = self.handshakeTimeout)
|
||||
else:
|
||||
HttpServer.create(ma.initTAddress().tryGet())
|
||||
HttpServer.create(
|
||||
ma.initTAddress().tryGet(),
|
||||
handshakeTimeout = self.handshakeTimeout
|
||||
)
|
||||
|
||||
self.httpservers &= httpserver
|
||||
|
||||
@@ -212,19 +229,19 @@ method accept*(self: WsTransport): Future[Connection] {.async, gcsafe.} =
|
||||
if not self.running:
|
||||
raise newTransportClosedError()
|
||||
|
||||
if self.acceptFuts.len <= 0:
|
||||
self.acceptFuts = self.httpservers.mapIt(it.accept())
|
||||
|
||||
if self.acceptFuts.len <= 0:
|
||||
return
|
||||
|
||||
let
|
||||
finished = await one(self.acceptFuts)
|
||||
index = self.acceptFuts.find(finished)
|
||||
|
||||
self.acceptFuts[index] = self.httpservers[index].accept()
|
||||
|
||||
try:
|
||||
if self.acceptFuts.len <= 0:
|
||||
self.acceptFuts = self.httpservers.mapIt(it.accept())
|
||||
|
||||
if self.acceptFuts.len <= 0:
|
||||
return
|
||||
|
||||
let
|
||||
finished = await one(self.acceptFuts)
|
||||
index = self.acceptFuts.find(finished)
|
||||
|
||||
self.acceptFuts[index] = self.httpservers[index].accept()
|
||||
|
||||
let req = await finished
|
||||
|
||||
try:
|
||||
@@ -240,6 +257,8 @@ method accept*(self: WsTransport): Future[Connection] {.async, gcsafe.} =
|
||||
debug "OS Error", exc = exc.msg
|
||||
except WebSocketError as exc:
|
||||
debug "Websocket Error", exc = exc.msg
|
||||
except HttpError as exc:
|
||||
debug "Http Error", exc = exc.msg
|
||||
except AsyncStreamError as exc:
|
||||
debug "AsyncStream Error", exc = exc.msg
|
||||
except TransportTooManyError as exc:
|
||||
@@ -291,7 +310,8 @@ proc new*(
|
||||
tlsFlags: set[TLSFlags] = {},
|
||||
flags: set[ServerFlags] = {},
|
||||
factories: openArray[ExtFactory] = [],
|
||||
rng: Rng = nil): T =
|
||||
rng: Rng = nil,
|
||||
handshakeTimeout = DefaultHeadersTimeout): T =
|
||||
|
||||
T(
|
||||
upgrader: upgrade,
|
||||
@@ -300,14 +320,16 @@ proc new*(
|
||||
tlsFlags: tlsFlags,
|
||||
flags: flags,
|
||||
factories: @factories,
|
||||
rng: rng)
|
||||
rng: rng,
|
||||
handshakeTimeout: handshakeTimeout)
|
||||
|
||||
proc new*(
|
||||
T: typedesc[WsTransport],
|
||||
upgrade: Upgrade,
|
||||
flags: set[ServerFlags] = {},
|
||||
factories: openArray[ExtFactory] = [],
|
||||
rng: Rng = nil): T =
|
||||
rng: Rng = nil,
|
||||
handshakeTimeout = DefaultHeadersTimeout): T =
|
||||
|
||||
T.new(
|
||||
upgrade = upgrade,
|
||||
@@ -315,4 +337,5 @@ proc new*(
|
||||
tlsCertificate = nil,
|
||||
flags = flags,
|
||||
factories = @factories,
|
||||
rng = rng)
|
||||
rng = rng,
|
||||
handshakeTimeout = handshakeTimeout)
|
||||
|
||||
@@ -54,16 +54,21 @@ proc acquire*(s: AsyncSemaphore): Future[void] =
|
||||
fut.cancelCallback = nil
|
||||
if not fut.finished:
|
||||
s.queue.keepItIf( it != fut )
|
||||
s.count.inc
|
||||
|
||||
fut.cancelCallback = cancellation
|
||||
|
||||
s.queue.add(fut)
|
||||
s.count.dec
|
||||
|
||||
trace "Queued slot", available = s.count, queue = s.queue.len
|
||||
return fut
|
||||
|
||||
proc forceAcquire*(s: AsyncSemaphore) =
|
||||
## ForceAcquire will always succeed,
|
||||
## creating a temporary slot if required.
|
||||
## This temporary slot will stay usable until
|
||||
## there is less `acquire`s than `release`s
|
||||
s.count.dec
|
||||
|
||||
proc release*(s: AsyncSemaphore) =
|
||||
## Release a resource from the semaphore,
|
||||
## by picking the first future from the queue
|
||||
@@ -77,13 +82,15 @@ proc release*(s: AsyncSemaphore) =
|
||||
trace "Releasing slot", available = s.count,
|
||||
queue = s.queue.len
|
||||
|
||||
if s.queue.len > 0:
|
||||
s.count.inc
|
||||
while s.queue.len > 0:
|
||||
var fut = s.queue[0]
|
||||
s.queue.delete(0)
|
||||
if not fut.finished():
|
||||
s.count.dec
|
||||
fut.complete()
|
||||
break
|
||||
|
||||
s.count.inc # increment the resource count
|
||||
trace "Released slot", available = s.count,
|
||||
queue = s.queue.len
|
||||
return
|
||||
|
||||
@@ -137,6 +137,10 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
|
||||
await transport1.stop()
|
||||
|
||||
asyncTest "e2e should allow multiple local addresses":
|
||||
when defined(windows):
|
||||
# this randomly locks the Windows CI job
|
||||
skip()
|
||||
return
|
||||
let addrs = @[MultiAddress.init(ma).tryGet(),
|
||||
MultiAddress.init(ma).tryGet()]
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ import utils,
|
||||
protocols/pubsub/floodsub,
|
||||
protocols/pubsub/rpc/messages,
|
||||
protocols/pubsub/peertable]
|
||||
import ../../libp2p/protocols/pubsub/errors as pubsub_errors
|
||||
|
||||
import ../helpers
|
||||
|
||||
|
||||
@@ -39,6 +39,8 @@ proc randomPeerId(): PeerId =
|
||||
except CatchableError as exc:
|
||||
raise newException(Defect, exc.msg)
|
||||
|
||||
const MsgIdSuccess = "msg id gen success"
|
||||
|
||||
suite "GossipSub internal":
|
||||
teardown:
|
||||
checkTrackers()
|
||||
@@ -308,7 +310,7 @@ suite "GossipSub internal":
|
||||
conn.peerId = peerId
|
||||
inc seqno
|
||||
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
|
||||
|
||||
check gossipSub.fanout[topic].len == 15
|
||||
check gossipSub.mesh[topic].len == 15
|
||||
@@ -355,7 +357,7 @@ suite "GossipSub internal":
|
||||
conn.peerId = peerId
|
||||
inc seqno
|
||||
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
|
||||
|
||||
let peers = gossipSub.getGossipPeers()
|
||||
check peers.len == gossipSub.parameters.d
|
||||
@@ -396,7 +398,7 @@ suite "GossipSub internal":
|
||||
conn.peerId = peerId
|
||||
inc seqno
|
||||
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
|
||||
|
||||
let peers = gossipSub.getGossipPeers()
|
||||
check peers.len == gossipSub.parameters.d
|
||||
@@ -437,7 +439,7 @@ suite "GossipSub internal":
|
||||
conn.peerId = peerId
|
||||
inc seqno
|
||||
let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno))
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdSuccess), msg)
|
||||
|
||||
let peers = gossipSub.getGossipPeers()
|
||||
check peers.len == 0
|
||||
|
||||
@@ -24,6 +24,7 @@ import utils, ../../libp2p/[errors,
|
||||
protocols/pubsub/peertable,
|
||||
protocols/pubsub/timedcache,
|
||||
protocols/pubsub/rpc/messages]
|
||||
import ../../libp2p/protocols/pubsub/errors as pubsub_errors
|
||||
import ../helpers
|
||||
|
||||
proc `$`(peer: PubSubPeer): string = shortLog(peer)
|
||||
@@ -564,6 +565,72 @@ suite "GossipSub":
|
||||
await allFuturesThrowing(nodesFut.concat())
|
||||
check observed == 2
|
||||
|
||||
asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic":
|
||||
var passed = newFuture[void]()
|
||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
check topic == "foobar"
|
||||
passed.complete()
|
||||
|
||||
let
|
||||
nodes = generateNodes(
|
||||
2,
|
||||
gossip = true,
|
||||
unsubscribeBackoff = 10.minutes)
|
||||
|
||||
# start switches
|
||||
nodesFut = await allFinished(
|
||||
nodes[0].switch.start(),
|
||||
nodes[1].switch.start(),
|
||||
)
|
||||
|
||||
# start pubsub
|
||||
await allFuturesThrowing(
|
||||
allFinished(
|
||||
nodes[0].start(),
|
||||
nodes[1].start(),
|
||||
))
|
||||
|
||||
await subscribeNodes(nodes)
|
||||
|
||||
nodes[1].subscribe("foobar", handler)
|
||||
nodes[0].subscribe("foobar", handler)
|
||||
await waitSub(nodes[0], nodes[1], "foobar")
|
||||
await waitSub(nodes[1], nodes[0], "foobar")
|
||||
|
||||
nodes[0].unsubscribe("foobar", handler)
|
||||
|
||||
let gsNode = GossipSub(nodes[1])
|
||||
check await checkExpiring(gsNode.mesh.getOrDefault("foobar").len == 0)
|
||||
|
||||
nodes[0].subscribe("foobar", handler)
|
||||
|
||||
check GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0
|
||||
|
||||
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
|
||||
|
||||
check:
|
||||
GossipSub(nodes[0]).fanout.getOrDefault("foobar").len > 0
|
||||
GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0
|
||||
|
||||
await passed.wait(2.seconds)
|
||||
|
||||
trace "test done, stopping..."
|
||||
|
||||
await nodes[0].stop()
|
||||
await nodes[1].stop()
|
||||
|
||||
await allFuturesThrowing(
|
||||
nodes[0].switch.stop(),
|
||||
nodes[1].switch.stop()
|
||||
)
|
||||
|
||||
await allFuturesThrowing(
|
||||
nodes[0].stop(),
|
||||
nodes[1].stop()
|
||||
)
|
||||
|
||||
await allFuturesThrowing(nodesFut.concat())
|
||||
|
||||
asyncTest "e2e - GossipSub send over mesh A -> B":
|
||||
var passed: Future[bool] = newFuture[bool]()
|
||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
@@ -865,3 +932,75 @@ suite "GossipSub":
|
||||
it.switch.stop())))
|
||||
|
||||
await allFuturesThrowing(nodesFut)
|
||||
|
||||
asyncTest "e2e - GossipSub peer exchange":
|
||||
# A, B & C are subscribed to something
|
||||
# B unsubcribe from it, it should send
|
||||
# PX to A & C
|
||||
#
|
||||
# C sent his SPR, not A
|
||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
discard # not used in this test
|
||||
|
||||
let
|
||||
nodes = generateNodes(
|
||||
2,
|
||||
gossip = true) &
|
||||
generateNodes(1, gossip = true, sendSignedPeerRecord = true)
|
||||
|
||||
# start switches
|
||||
nodesFut = await allFinished(
|
||||
nodes[0].switch.start(),
|
||||
nodes[1].switch.start(),
|
||||
nodes[2].switch.start(),
|
||||
)
|
||||
|
||||
# start pubsub
|
||||
await allFuturesThrowing(
|
||||
allFinished(
|
||||
nodes[0].start(),
|
||||
nodes[1].start(),
|
||||
nodes[2].start(),
|
||||
))
|
||||
|
||||
var
|
||||
gossip0 = GossipSub(nodes[0])
|
||||
gossip1 = GossipSub(nodes[1])
|
||||
gossip2 = GossipSub(nodes[1])
|
||||
|
||||
await subscribeNodes(nodes)
|
||||
|
||||
nodes[0].subscribe("foobar", handler)
|
||||
nodes[1].subscribe("foobar", handler)
|
||||
nodes[2].subscribe("foobar", handler)
|
||||
for x in 0..<3:
|
||||
for y in 0..<3:
|
||||
if x != y:
|
||||
await waitSub(nodes[x], nodes[y], "foobar")
|
||||
|
||||
var passed: Future[void] = newFuture[void]()
|
||||
gossip0.routingRecordsHandler.add(proc(peer: PeerId, tag: string, peers: seq[RoutingRecordsPair]) =
|
||||
check:
|
||||
tag == "foobar"
|
||||
peers.len == 2
|
||||
peers[0].record.isSome() xor peers[1].record.isSome()
|
||||
passed.complete()
|
||||
)
|
||||
nodes[1].unsubscribe("foobar", handler)
|
||||
|
||||
await passed
|
||||
|
||||
await allFuturesThrowing(
|
||||
nodes[0].switch.stop(),
|
||||
nodes[1].switch.stop(),
|
||||
nodes[2].switch.stop()
|
||||
)
|
||||
|
||||
await allFuturesThrowing(
|
||||
nodes[0].stop(),
|
||||
nodes[1].stop(),
|
||||
nodes[2].stop()
|
||||
)
|
||||
|
||||
await allFuturesThrowing(nodesFut.concat())
|
||||
|
||||
|
||||
@@ -147,6 +147,9 @@ suite "GossipSub":
|
||||
nodes[1].start(),
|
||||
))
|
||||
|
||||
# We must subscribe before setting the validator
|
||||
nodes[0].subscribe("foobar", handler)
|
||||
|
||||
var gossip = GossipSub(nodes[0])
|
||||
let invalidDetected = newFuture[void]()
|
||||
gossip.subscriptionValidator =
|
||||
@@ -162,7 +165,6 @@ suite "GossipSub":
|
||||
|
||||
await subscribeNodes(nodes)
|
||||
|
||||
nodes[0].subscribe("foobar", handler)
|
||||
nodes[1].subscribe("foobar", handler)
|
||||
|
||||
await invalidDetected.wait(10.seconds)
|
||||
|
||||
@@ -5,19 +5,21 @@ import stew/byteutils
|
||||
import ../../libp2p/[peerid,
|
||||
crypto/crypto,
|
||||
protocols/pubsub/mcache,
|
||||
protocols/pubsub/rpc/message,
|
||||
protocols/pubsub/rpc/messages]
|
||||
import ./utils
|
||||
|
||||
var rng = newRng()
|
||||
|
||||
proc randomPeerId(): PeerId =
|
||||
PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).get()
|
||||
|
||||
const MsgIdGenSuccess = "msg id generation success"
|
||||
|
||||
suite "MCache":
|
||||
test "put/get":
|
||||
var mCache = MCache.init(3, 5)
|
||||
var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes())
|
||||
let msgId = defaultMsgIdProvider(msg)
|
||||
let msgId = defaultMsgIdProvider(msg).expect(MsgIdGenSuccess)
|
||||
mCache.put(msgId, msg)
|
||||
check mCache.get(msgId).isSome and mCache.get(msgId).get() == msg
|
||||
|
||||
@@ -28,13 +30,13 @@ suite "MCache":
|
||||
var msg = Message(fromPeer: randomPeerId(),
|
||||
seqno: "12345".toBytes(),
|
||||
topicIDs: @["foo"])
|
||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||
|
||||
for i in 0..<5:
|
||||
var msg = Message(fromPeer: randomPeerId(),
|
||||
seqno: "12345".toBytes(),
|
||||
topicIDs: @["bar"])
|
||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||
|
||||
var mids = mCache.window("foo")
|
||||
check mids.len == 3
|
||||
@@ -49,7 +51,7 @@ suite "MCache":
|
||||
var msg = Message(fromPeer: randomPeerId(),
|
||||
seqno: "12345".toBytes(),
|
||||
topicIDs: @["foo"])
|
||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||
|
||||
mCache.shift()
|
||||
check mCache.window("foo").len == 0
|
||||
@@ -58,7 +60,7 @@ suite "MCache":
|
||||
var msg = Message(fromPeer: randomPeerId(),
|
||||
seqno: "12345".toBytes(),
|
||||
topicIDs: @["bar"])
|
||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||
|
||||
mCache.shift()
|
||||
check mCache.window("bar").len == 0
|
||||
@@ -67,7 +69,7 @@ suite "MCache":
|
||||
var msg = Message(fromPeer: randomPeerId(),
|
||||
seqno: "12345".toBytes(),
|
||||
topicIDs: @["baz"])
|
||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||
|
||||
mCache.shift()
|
||||
check mCache.window("baz").len == 0
|
||||
@@ -79,19 +81,19 @@ suite "MCache":
|
||||
var msg = Message(fromPeer: randomPeerId(),
|
||||
seqno: "12345".toBytes(),
|
||||
topicIDs: @["foo"])
|
||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||
|
||||
for i in 0..<3:
|
||||
var msg = Message(fromPeer: randomPeerId(),
|
||||
seqno: "12345".toBytes(),
|
||||
topicIDs: @["bar"])
|
||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||
|
||||
for i in 0..<3:
|
||||
var msg = Message(fromPeer: randomPeerId(),
|
||||
seqno: "12345".toBytes(),
|
||||
topicIDs: @["baz"])
|
||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||
|
||||
mCache.shift()
|
||||
check mCache.window("foo").len == 0
|
||||
|
||||
@@ -3,8 +3,10 @@ import unittest2
|
||||
{.used.}
|
||||
|
||||
import options
|
||||
import stew/byteutils
|
||||
import ../../libp2p/[peerid, peerinfo,
|
||||
crypto/crypto,
|
||||
protocols/pubsub/errors,
|
||||
protocols/pubsub/rpc/message,
|
||||
protocols/pubsub/rpc/messages]
|
||||
|
||||
@@ -18,3 +20,56 @@ suite "Message":
|
||||
msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true)
|
||||
|
||||
check verify(msg)
|
||||
|
||||
test "defaultMsgIdProvider success":
|
||||
let
|
||||
seqno = 11'u64
|
||||
pkHex =
|
||||
"""08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C
|
||||
E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F
|
||||
E731065A"""
|
||||
seckey = PrivateKey.init(fromHex(stripSpaces(pkHex)))
|
||||
.expect("valid private key bytes")
|
||||
peer = PeerInfo.new(seckey)
|
||||
msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true)
|
||||
msgIdResult = msg.defaultMsgIdProvider()
|
||||
|
||||
check:
|
||||
msgIdResult.isOk
|
||||
string.fromBytes(msgIdResult.get) ==
|
||||
"000000000000000b12D3KooWGyLzSt9g4U9TdHYDvVWAs5Ht4WrocgoyqPxxvnqAL8qw"
|
||||
|
||||
test "defaultMsgIdProvider error - no source peer id":
|
||||
let
|
||||
seqno = 11'u64
|
||||
pkHex =
|
||||
"""08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C
|
||||
E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F
|
||||
E731065A"""
|
||||
seckey = PrivateKey.init(fromHex(stripSpaces(pkHex)))
|
||||
.expect("valid private key bytes")
|
||||
peer = PeerInfo.new(seckey)
|
||||
|
||||
var msg = Message.init(peer.some, @[], "topic", some(seqno), sign = true)
|
||||
msg.fromPeer = PeerId()
|
||||
let msgIdResult = msg.defaultMsgIdProvider()
|
||||
|
||||
check:
|
||||
msgIdResult.isErr
|
||||
msgIdResult.error == ValidationResult.Reject
|
||||
|
||||
test "defaultMsgIdProvider error - no source seqno":
|
||||
let
|
||||
pkHex =
|
||||
"""08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C
|
||||
E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F
|
||||
E731065A"""
|
||||
seckey = PrivateKey.init(fromHex(stripSpaces(pkHex)))
|
||||
.expect("valid private key bytes")
|
||||
peer = PeerInfo.new(seckey)
|
||||
msg = Message.init(some(peer), @[], "topic", uint64.none, sign = true)
|
||||
msgIdResult = msg.defaultMsgIdProvider()
|
||||
|
||||
check:
|
||||
msgIdResult.isErr
|
||||
msgIdResult.error == ValidationResult.Reject
|
||||
|
||||
@@ -4,33 +4,48 @@ const
|
||||
libp2p_pubsub_verify {.booldefine.} = true
|
||||
libp2p_pubsub_anonymize {.booldefine.} = false
|
||||
|
||||
import random, tables
|
||||
import chronos
|
||||
import hashes, random, tables
|
||||
import chronos, stew/[byteutils, results]
|
||||
import ../../libp2p/[builders,
|
||||
protocols/pubsub/errors,
|
||||
protocols/pubsub/pubsub,
|
||||
protocols/pubsub/gossipsub,
|
||||
protocols/pubsub/floodsub,
|
||||
protocols/pubsub/rpc/messages,
|
||||
protocols/secure/secure]
|
||||
|
||||
export builders
|
||||
|
||||
randomize()
|
||||
|
||||
func defaultMsgIdProvider*(m: Message): Result[MessageID, ValidationResult] =
|
||||
let mid =
|
||||
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
|
||||
byteutils.toHex(m.seqno) & $m.fromPeer
|
||||
else:
|
||||
# This part is irrelevant because it's not standard,
|
||||
# We use it exclusively for testing basically and users should
|
||||
# implement their own logic in the case they use anonymization
|
||||
$m.data.hash & $m.topicIDs.hash
|
||||
ok mid.toBytes()
|
||||
|
||||
proc generateNodes*(
|
||||
num: Natural,
|
||||
secureManagers: openArray[SecureProtocol] = [
|
||||
SecureProtocol.Noise
|
||||
],
|
||||
msgIdProvider: MsgIdProvider = nil,
|
||||
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
|
||||
gossip: bool = false,
|
||||
triggerSelf: bool = false,
|
||||
verifySignature: bool = libp2p_pubsub_verify,
|
||||
anonymize: bool = libp2p_pubsub_anonymize,
|
||||
sign: bool = libp2p_pubsub_sign,
|
||||
sendSignedPeerRecord = false,
|
||||
unsubscribeBackoff = 1.seconds,
|
||||
maxMessageSize: int = 1024 * 1024): seq[PubSub] =
|
||||
|
||||
for i in 0..<num:
|
||||
let switch = newStandardSwitch(secureManagers = secureManagers)
|
||||
let switch = newStandardSwitch(secureManagers = secureManagers, sendSignedPeerRecord = sendSignedPeerRecord)
|
||||
let pubsub = if gossip:
|
||||
let g = GossipSub.init(
|
||||
switch = switch,
|
||||
@@ -40,7 +55,7 @@ proc generateNodes*(
|
||||
msgIdProvider = msgIdProvider,
|
||||
anonymize = anonymize,
|
||||
maxMessageSize = maxMessageSize,
|
||||
parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p.historyLength = 20; p.historyGossip = 20; p.unsubscribeBackoff = 1.seconds; p))
|
||||
parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p.historyLength = 20; p.historyGossip = 20; p.unsubscribeBackoff = unsubscribeBackoff; p))
|
||||
# set some testing params, to enable scores
|
||||
g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0
|
||||
g.topicParams.mgetOrPut("foo", TopicParams.init()).topicWeight = 1.0
|
||||
|
||||
@@ -463,3 +463,33 @@ suite "Connection Manager":
|
||||
await connMngr.close()
|
||||
await allFuturesThrowing(
|
||||
allFutures(conns.mapIt( it.close() )))
|
||||
|
||||
asyncTest "allow force dial":
|
||||
let connMngr = ConnManager.new(maxConnections = 2)
|
||||
|
||||
var conns: seq[Connection]
|
||||
for i in 0..<3:
|
||||
let conn = connMngr.trackOutgoingConn(
|
||||
(proc(): Future[Connection] {.async.} =
|
||||
return Connection.new(
|
||||
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
|
||||
Direction.In)
|
||||
), true
|
||||
)
|
||||
|
||||
check await conn.withTimeout(10.millis)
|
||||
conns.add(await conn)
|
||||
|
||||
# should throw adding a connection over the limit
|
||||
expect TooManyConnectionsError:
|
||||
discard await connMngr.trackOutgoingConn(
|
||||
(proc(): Future[Connection] {.async.} =
|
||||
return Connection.new(
|
||||
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
|
||||
Direction.In)
|
||||
), false
|
||||
)
|
||||
|
||||
await connMngr.close()
|
||||
await allFuturesThrowing(
|
||||
allFutures(conns.mapIt( it.close() )))
|
||||
|
||||
@@ -77,6 +77,7 @@ suite "Identify":
|
||||
check id.protoVersion.get() == ProtoVersion
|
||||
check id.agentVersion.get() == AgentVersion
|
||||
check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
|
||||
check id.signedPeerRecord.isNone()
|
||||
|
||||
asyncTest "custom agent version":
|
||||
const customAgentVersion = "MY CUSTOM AGENT STRING"
|
||||
@@ -100,6 +101,7 @@ suite "Identify":
|
||||
check id.protoVersion.get() == ProtoVersion
|
||||
check id.agentVersion.get() == customAgentVersion
|
||||
check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
|
||||
check id.signedPeerRecord.isNone()
|
||||
|
||||
asyncTest "handle failed identify":
|
||||
msListen.addHandler(IdentifyCodec, identifyProto1)
|
||||
@@ -123,6 +125,27 @@ suite "Identify":
|
||||
discard await msDial.select(conn, IdentifyCodec)
|
||||
discard await identifyProto2.identify(conn, pi2.peerId)
|
||||
|
||||
asyncTest "can send signed peer record":
|
||||
msListen.addHandler(IdentifyCodec, identifyProto1)
|
||||
identifyProto1.sendSignedPeerRecord = true
|
||||
serverFut = transport1.start(ma)
|
||||
proc acceptHandler(): Future[void] {.async, gcsafe.} =
|
||||
let c = await transport1.accept()
|
||||
await msListen.handle(c)
|
||||
|
||||
acceptFut = acceptHandler()
|
||||
conn = await transport2.dial(transport1.addrs[0])
|
||||
|
||||
discard await msDial.select(conn, IdentifyCodec)
|
||||
let id = await identifyProto2.identify(conn, remotePeerInfo.peerId)
|
||||
|
||||
check id.pubkey.get() == remoteSecKey.getPublicKey().get()
|
||||
check id.addrs == ma
|
||||
check id.protoVersion.get() == ProtoVersion
|
||||
check id.agentVersion.get() == AgentVersion
|
||||
check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
|
||||
check id.signedPeerRecord.get() == remotePeerInfo.signedPeerRecord.get()
|
||||
|
||||
suite "handle push identify message":
|
||||
var
|
||||
switch1 {.threadvar.}: Switch
|
||||
@@ -160,6 +183,10 @@ suite "Identify":
|
||||
switch1.peerStore.addressBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.addrs.toHashSet()
|
||||
switch2.peerStore.addressBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.addrs.toHashSet()
|
||||
|
||||
#switch1.peerStore.signedPeerRecordBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.signedPeerRecord.get()
|
||||
#switch2.peerStore.signedPeerRecordBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.signedPeerRecord.get()
|
||||
# no longer sent by default
|
||||
|
||||
proc closeAll() {.async.} =
|
||||
await conn.close()
|
||||
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
{.used.}
|
||||
|
||||
import options, bearssl
|
||||
import chronos
|
||||
import chronos, stew/byteutils
|
||||
import ../libp2p/crypto/crypto,
|
||||
../libp2p/multicodec,
|
||||
../libp2p/peerinfo,
|
||||
../libp2p/peerid
|
||||
../libp2p/peerid,
|
||||
../libp2p/routing_record
|
||||
|
||||
import ./helpers
|
||||
|
||||
@@ -16,3 +18,32 @@ suite "PeerInfo":
|
||||
|
||||
check peerId == peerInfo.peerId
|
||||
check seckey.getPublicKey().get() == peerInfo.publicKey
|
||||
|
||||
test "Signed peer record":
|
||||
const
|
||||
ExpectedDomain = $multiCodec("libp2p-peer-record")
|
||||
ExpectedPayloadType = @[(byte) 0x03, (byte) 0x01]
|
||||
|
||||
let
|
||||
seckey = PrivateKey.random(rng[]).tryGet()
|
||||
peerId = PeerID.init(seckey).get()
|
||||
multiAddresses = @[MultiAddress.init("/ip4/0.0.0.0/tcp/24").tryGet(), MultiAddress.init("/ip4/0.0.0.0/tcp/25").tryGet()]
|
||||
peerInfo = PeerInfo.new(seckey, multiAddresses)
|
||||
|
||||
let
|
||||
env = peerInfo.signedPeerRecord.get()
|
||||
rec = PeerRecord.decode(env.payload()).tryGet()
|
||||
|
||||
# Check envelope fields
|
||||
check:
|
||||
env.publicKey == peerInfo.publicKey
|
||||
env.domain == ExpectedDomain
|
||||
env.payloadType == ExpectedPayloadType
|
||||
|
||||
# Check payload (routing record)
|
||||
check:
|
||||
rec.peerId == peerId
|
||||
rec.seqNo > 0
|
||||
rec.addresses.len == 2
|
||||
rec.addresses[0].address == multiAddresses[0]
|
||||
rec.addresses[1].address == multiAddresses[1]
|
||||
|
||||
@@ -9,7 +9,7 @@ suite "Routing record":
|
||||
privKey = PrivateKey.random(rng[]).tryGet()
|
||||
peerId = PeerId.init(privKey).tryGet()
|
||||
multiAddresses = @[MultiAddress.init("/ip4/0.0.0.0/tcp/24").tryGet(), MultiAddress.init("/ip4/0.0.0.0/tcp/25").tryGet()]
|
||||
routingRecord = PeerRecord.init(peerId, 42, multiAddresses)
|
||||
routingRecord = PeerRecord.init(peerId, multiAddresses, 42)
|
||||
|
||||
buffer = routingRecord.encode()
|
||||
|
||||
@@ -36,3 +36,33 @@ suite "Routing record":
|
||||
$decodedRecord.addresses[0].address == "/ip4/1.2.3.4/tcp/0"
|
||||
$decodedRecord.addresses[1].address == "/ip4/1.2.3.4/tcp/1"
|
||||
|
||||
suite "Signed Routing Record":
|
||||
test "Encode -> decode test":
|
||||
let
|
||||
rng = newRng()
|
||||
privKey = PrivateKey.random(rng[]).tryGet()
|
||||
peerId = PeerId.init(privKey).tryGet()
|
||||
multiAddresses = @[MultiAddress.init("/ip4/0.0.0.0/tcp/24").tryGet(), MultiAddress.init("/ip4/0.0.0.0/tcp/25").tryGet()]
|
||||
routingRecord = SignedPeerRecord.init(privKey, PeerRecord.init(peerId, multiAddresses, 42)).tryGet()
|
||||
buffer = routingRecord.envelope.encode().tryGet()
|
||||
|
||||
parsedRR = SignedPeerRecord.decode(buffer).tryGet().data
|
||||
|
||||
check:
|
||||
parsedRR.peerId == peerId
|
||||
parsedRR.seqNo == 42
|
||||
parsedRR.addresses.len == 2
|
||||
parsedRR.addresses[0].address == multiAddresses[0]
|
||||
parsedRR.addresses[1].address == multiAddresses[1]
|
||||
|
||||
test "Can't use mismatched public key":
|
||||
let
|
||||
rng = newRng()
|
||||
privKey = PrivateKey.random(rng[]).tryGet()
|
||||
privKey2 = PrivateKey.random(rng[]).tryGet()
|
||||
peerId = PeerId.init(privKey).tryGet()
|
||||
multiAddresses = @[MultiAddress.init("/ip4/0.0.0.0/tcp/24").tryGet(), MultiAddress.init("/ip4/0.0.0.0/tcp/25").tryGet()]
|
||||
routingRecord = SignedPeerRecord.init(privKey2, PeerRecord.init(peerId, multiAddresses, 42)).tryGet()
|
||||
buffer = routingRecord.envelope.encode().tryGet()
|
||||
|
||||
check SignedPeerRecord.decode(buffer).error == EnvelopeInvalidSignature
|
||||
|
||||
@@ -36,7 +36,7 @@ suite "AsyncSemaphore":
|
||||
await sema.acquire()
|
||||
let fut = sema.acquire()
|
||||
|
||||
check sema.count == -1
|
||||
check sema.count == 0
|
||||
sema.release()
|
||||
sema.release()
|
||||
check sema.count == 1
|
||||
@@ -66,7 +66,7 @@ suite "AsyncSemaphore":
|
||||
|
||||
let fut = sema.acquire()
|
||||
check fut.finished == false
|
||||
check sema.count == -1
|
||||
check sema.count == 0
|
||||
|
||||
sema.release()
|
||||
sema.release()
|
||||
@@ -104,12 +104,20 @@ suite "AsyncSemaphore":
|
||||
|
||||
await sema.acquire()
|
||||
|
||||
let tmp = sema.acquire()
|
||||
check not tmp.finished()
|
||||
let
|
||||
tmp = sema.acquire()
|
||||
tmp2 = sema.acquire()
|
||||
check:
|
||||
not tmp.finished()
|
||||
not tmp2.finished()
|
||||
|
||||
tmp.cancel()
|
||||
sema.release()
|
||||
|
||||
check tmp2.finished()
|
||||
|
||||
sema.release()
|
||||
|
||||
check await sema.acquire().withTimeout(10.millis)
|
||||
|
||||
asyncTest "should handle out of order cancellations":
|
||||
@@ -145,3 +153,43 @@ suite "AsyncSemaphore":
|
||||
sema.release()
|
||||
|
||||
check await sema.acquire().withTimeout(10.millis)
|
||||
|
||||
asyncTest "should handle forceAcquire properly":
|
||||
let sema = newAsyncSemaphore(1)
|
||||
|
||||
await sema.acquire()
|
||||
check not(await sema.acquire().withTimeout(1.millis)) # should not acquire but cancel
|
||||
|
||||
let
|
||||
fut1 = sema.acquire()
|
||||
fut2 = sema.acquire()
|
||||
|
||||
sema.forceAcquire()
|
||||
sema.release()
|
||||
|
||||
await fut1 or fut2 or sleepAsync(1.millis)
|
||||
check:
|
||||
fut1.finished()
|
||||
not fut2.finished()
|
||||
|
||||
sema.release()
|
||||
await fut1 or fut2 or sleepAsync(1.millis)
|
||||
check:
|
||||
fut1.finished()
|
||||
fut2.finished()
|
||||
|
||||
|
||||
sema.forceAcquire()
|
||||
sema.forceAcquire()
|
||||
|
||||
let
|
||||
fut3 = sema.acquire()
|
||||
fut4 = sema.acquire()
|
||||
fut5 = sema.acquire()
|
||||
sema.release()
|
||||
sema.release()
|
||||
await sleepAsync(1.millis)
|
||||
check:
|
||||
fut3.finished()
|
||||
fut4.finished()
|
||||
not fut5.finished()
|
||||
|
||||
@@ -3,7 +3,7 @@ import stew/byteutils
|
||||
import ../libp2p/[signed_envelope]
|
||||
|
||||
suite "Signed envelope":
|
||||
test "Encode -> decode test":
|
||||
test "Encode -> decode -> encode -> decode test":
|
||||
let
|
||||
rng = newRng()
|
||||
privKey = PrivateKey.random(rng[]).tryGet()
|
||||
@@ -12,10 +12,16 @@ suite "Signed envelope":
|
||||
decodedEnvelope = Envelope.decode(buffer, "domain").tryGet()
|
||||
wrongDomain = Envelope.decode(buffer, "wdomain")
|
||||
|
||||
reencodedEnvelope = decodedEnvelope.encode().tryGet()
|
||||
redecodedEnvelope = Envelope.decode(reencodedEnvelope, "domain").tryGet()
|
||||
|
||||
check:
|
||||
decodedEnvelope == envelope
|
||||
wrongDomain.error == EnvelopeInvalidSignature
|
||||
|
||||
reencodedEnvelope == buffer
|
||||
redecodedEnvelope == envelope
|
||||
|
||||
test "Interop decode test":
|
||||
# from https://github.com/libp2p/go-libp2p-core/blob/b18a4c9c5629870bde2cd85ab3b87a507600d411/record/envelope_test.go#L68
|
||||
let inputData = "0a24080112206f1581709bb7b1ef030d210db18e3b0ba1c776fba65d8cdaad05415142d189f812102f6c69627032702f74657374646174611a0c68656c6c6f20776f726c64212a401178673b51dfa842aad17e465e25d646ad16628916b964c3fb10c711fee87872bdd4e4646f58c277cdff09704913d8be1aec6322de8d3d0bb852120374aece08".hexToSeqByte()
|
||||
@@ -28,3 +34,56 @@ suite "Signed envelope":
|
||||
# same as above, but payload altered
|
||||
let inputData = "0a24080112206f1581709bb7b1ef030d210db18e3b0ba1c776fba65d8cdaad05415142d189f812102f6c69627032702f74657374646174611a0c00006c6c6f20776f726c64212a401178673b51dfa842aad17e465e25d646ad16628916b964c3fb10c711fee87872bdd4e4646f58c277cdff09704913d8be1aec6322de8d3d0bb852120374aece08".hexToSeqByte()
|
||||
check Envelope.decode(inputData, "libp2p-testing").error == EnvelopeInvalidSignature
|
||||
|
||||
# needs to be exported to work
|
||||
type
|
||||
DummyPayload* = object
|
||||
awesome: byte
|
||||
SignedDummy = SignedPayload[DummyPayload]
|
||||
|
||||
proc decode*(T: typedesc[DummyPayload], buffer: seq[byte]): Result[DummyPayload, cstring] =
|
||||
ok(DummyPayload(awesome: buffer[0]))
|
||||
|
||||
proc encode*(pd: DummyPayload): seq[byte] =
|
||||
@[pd.awesome]
|
||||
|
||||
proc checkValid*(pd: SignedDummy): Result[void, EnvelopeError] =
|
||||
if pd.data.awesome == 12.byte: ok()
|
||||
else: err(EnvelopeInvalidSignature)
|
||||
|
||||
proc payloadDomain*(T: typedesc[DummyPayload]): string = "dummy"
|
||||
proc payloadType*(T: typedesc[DummyPayload]): seq[byte] = @[(byte) 0x00, (byte) 0x00]
|
||||
suite "Signed payload":
|
||||
test "Simple encode -> decode":
|
||||
let
|
||||
rng = newRng()
|
||||
privKey = PrivateKey.random(rng[]).tryGet()
|
||||
|
||||
dummyPayload = DummyPayload(awesome: 12.byte)
|
||||
signed = SignedDummy.init(privKey, dummyPayload).tryGet()
|
||||
encoded = signed.encode().tryGet()
|
||||
decoded = SignedDummy.decode(encoded).tryGet()
|
||||
|
||||
check:
|
||||
dummyPayload.awesome == decoded.data.awesome
|
||||
decoded.envelope.publicKey == privKey.getPublicKey().tryGet()
|
||||
|
||||
test "Invalid payload":
|
||||
let
|
||||
rng = newRng()
|
||||
privKey = PrivateKey.random(rng[]).tryGet()
|
||||
|
||||
dummyPayload = DummyPayload(awesome: 30.byte)
|
||||
signed = SignedDummy.init(privKey, dummyPayload).tryGet()
|
||||
encoded = signed.encode().tryGet()
|
||||
check SignedDummy.decode(encoded).error == EnvelopeInvalidSignature
|
||||
|
||||
test "Invalid payload type":
|
||||
let
|
||||
rng = newRng()
|
||||
privKey = PrivateKey.random(rng[]).tryGet()
|
||||
|
||||
dummyPayload = DummyPayload(awesome: 30.byte)
|
||||
signed = Envelope.init(privKey, @[55.byte], dummyPayload.encode(), DummyPayload.payloadDomain).tryGet()
|
||||
encoded = signed.encode().tryGet()
|
||||
check SignedDummy.decode(encoded).error == EnvelopeWrongType
|
||||
|
||||
@@ -841,6 +841,10 @@ suite "Switch":
|
||||
switch2.peerStore.protoBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.protocols.toHashSet()
|
||||
|
||||
asyncTest "e2e should allow multiple local addresses":
|
||||
when defined(windows):
|
||||
# this randomly locks the Windows CI job
|
||||
skip()
|
||||
return
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
|
||||
try:
|
||||
let msg = string.fromBytes(await conn.readLp(1024))
|
||||
|
||||
Reference in New Issue
Block a user