mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-09 14:28:11 -05:00
Compare commits
35 Commits
v1.4.0
...
gh-readonl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f1fd4eb7f4 | ||
|
|
ed5670408b | ||
|
|
97192a3c80 | ||
|
|
294d06323c | ||
|
|
a3b8729cbe | ||
|
|
6c970911f2 | ||
|
|
5d48776b02 | ||
|
|
d389d96789 | ||
|
|
09fe199b6b | ||
|
|
68306cf1f1 | ||
|
|
b37133ca43 | ||
|
|
3e3df07269 | ||
|
|
1771534030 | ||
|
|
21a444197c | ||
|
|
966996542e | ||
|
|
8070b21825 | ||
|
|
d98152f266 | ||
|
|
47a51983b5 | ||
|
|
70754cd575 | ||
|
|
a1811e7395 | ||
|
|
c6e8fadbda | ||
|
|
48846d69cb | ||
|
|
18a2e79ce2 | ||
|
|
55cc5434fe | ||
|
|
cde5ed7e8c | ||
|
|
6ec038d29a | ||
|
|
fdae9e4b42 | ||
|
|
a60f0c5532 | ||
|
|
62f2d85f11 | ||
|
|
e5e319c1a9 | ||
|
|
f8d4da6421 | ||
|
|
b5fb7b3a97 | ||
|
|
fa19bbbbb7 | ||
|
|
86563cbddd | ||
|
|
be801602f6 |
10
.github/actions/install_nim/action.yml
vendored
10
.github/actions/install_nim/action.yml
vendored
@@ -6,7 +6,7 @@ inputs:
|
||||
cpu:
|
||||
description: "CPU to build for"
|
||||
default: "amd64"
|
||||
nim_branch:
|
||||
nim_ref:
|
||||
description: "Nim version"
|
||||
default: "version-1-6"
|
||||
shell:
|
||||
@@ -61,7 +61,7 @@ runs:
|
||||
- name: Restore Nim DLLs dependencies (Windows) from cache
|
||||
if: inputs.os == 'Windows'
|
||||
id: windows-dlls-cache
|
||||
uses: actions/cache@v3
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: external/dlls
|
||||
key: 'dlls'
|
||||
@@ -114,10 +114,10 @@ runs:
|
||||
|
||||
- name: Restore Nim from cache
|
||||
id: nim-cache
|
||||
uses: actions/cache@v3
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: '${{ github.workspace }}/nim'
|
||||
key: ${{ inputs.os }}-${{ inputs.cpu }}-nim-${{ inputs.nim_branch }}-cache-${{ env.cache_nonce }}
|
||||
key: ${{ inputs.os }}-${{ inputs.cpu }}-nim-${{ inputs.nim_ref }}-cache-${{ env.cache_nonce }}
|
||||
|
||||
- name: Build Nim and Nimble
|
||||
shell: ${{ inputs.shell }}
|
||||
@@ -126,6 +126,6 @@ runs:
|
||||
# We don't want partial matches of the cache restored
|
||||
rm -rf nim
|
||||
curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh
|
||||
env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} NIM_COMMIT=${{ inputs.nim_branch }} \
|
||||
env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} NIM_COMMIT=${{ inputs.nim_ref }} \
|
||||
QUICK_AND_DIRTY_COMPILER=1 QUICK_AND_DIRTY_NIMBLE=1 CC=gcc \
|
||||
bash build_nim.sh nim csources dist/nimble NimBinaries
|
||||
|
||||
89
.github/workflows/ci.yml
vendored
89
.github/workflows/ci.yml
vendored
@@ -1,10 +1,11 @@
|
||||
name: CI
|
||||
name: Continuous Integration
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- unstable
|
||||
pull_request:
|
||||
merge_group:
|
||||
workflow_dispatch:
|
||||
|
||||
concurrency:
|
||||
@@ -12,60 +13,70 @@ concurrency:
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
build:
|
||||
test:
|
||||
timeout-minutes: 90
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
target:
|
||||
platform:
|
||||
- os: linux
|
||||
cpu: amd64
|
||||
- os: linux
|
||||
cpu: i386
|
||||
- os: linux-gcc-14
|
||||
cpu: amd64
|
||||
- os: macos
|
||||
cpu: amd64
|
||||
- os: windows
|
||||
cpu: amd64
|
||||
#- os: windows
|
||||
#cpu: i386
|
||||
branch: [version-1-6]
|
||||
nim:
|
||||
- ref: version-1-6
|
||||
memory_management: refc
|
||||
# The ref below corresponds to the branch "version-2-0".
|
||||
# Right before an update from Nimble 0.16.1 to 0.16.2.
|
||||
# That update breaks our dependency resolution.
|
||||
- ref: 8754469f4947844c5938f56e1fba846c349354b5
|
||||
memory_management: refc
|
||||
include:
|
||||
- target:
|
||||
- platform:
|
||||
os: linux
|
||||
builder: ubuntu-20.04
|
||||
builder: ubuntu-22.04
|
||||
shell: bash
|
||||
- target:
|
||||
- platform:
|
||||
os: linux-gcc-14
|
||||
builder: ubuntu-24.04
|
||||
shell: bash
|
||||
- platform:
|
||||
os: macos
|
||||
builder: macos-12
|
||||
builder: macos-13
|
||||
shell: bash
|
||||
- target:
|
||||
- platform:
|
||||
os: windows
|
||||
builder: windows-2019
|
||||
builder: windows-2022
|
||||
shell: msys2 {0}
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: ${{ matrix.shell }}
|
||||
|
||||
name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})'
|
||||
name: '${{ matrix.platform.os }}-${{ matrix.platform.cpu }} (Nim ${{ matrix.nim.ref }})'
|
||||
runs-on: ${{ matrix.builder }}
|
||||
continue-on-error: ${{ matrix.branch == 'devel' }}
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
- name: Setup Nim
|
||||
uses: "./.github/actions/install_nim"
|
||||
with:
|
||||
os: ${{ matrix.target.os }}
|
||||
cpu: ${{ matrix.target.cpu }}
|
||||
os: ${{ matrix.platform.os }}
|
||||
cpu: ${{ matrix.platform.cpu }}
|
||||
shell: ${{ matrix.shell }}
|
||||
nim_branch: ${{ matrix.branch }}
|
||||
nim_ref: ${{ matrix.nim.ref }}
|
||||
|
||||
- name: Setup Go
|
||||
uses: actions/setup-go@v2
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: '~1.15.5'
|
||||
|
||||
@@ -78,37 +89,29 @@ jobs:
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: nimbledeps
|
||||
key: nimbledeps-${{ hashFiles('.pinned') }}
|
||||
# Using nim.ref as a simple way to differentiate between nimble using the "pkgs" or "pkgs2" directories.
|
||||
# The change happened on Nimble v0.14.0.
|
||||
key: nimbledeps-${{ matrix.nim.ref }}-${{ hashFiles('.pinned') }} # hashFiles returns a different value on windows
|
||||
|
||||
- name: Install deps
|
||||
if: ${{ steps.deps-cache.outputs.cache-hit != 'true' }}
|
||||
run: |
|
||||
nimble install_pinned
|
||||
|
||||
- name: Use gcc 14
|
||||
if : ${{ matrix.platform.os == 'linux-gcc-14'}}
|
||||
run: |
|
||||
# Add GCC-14 to alternatives
|
||||
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-14 14
|
||||
|
||||
# Set GCC-14 as the default
|
||||
sudo update-alternatives --set gcc /usr/bin/gcc-14
|
||||
|
||||
- name: Run tests
|
||||
run: |
|
||||
nim --version
|
||||
nimble --version
|
||||
gcc --version
|
||||
|
||||
NIMFLAGS="${NIMFLAGS} --mm:${{ matrix.nim.memory_management }}"
|
||||
nimble test
|
||||
|
||||
lint:
|
||||
name: "Lint"
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 2 # In PR, has extra merge commit: ^1 = PR, ^2 = base
|
||||
|
||||
- name: Check nph formatting
|
||||
# Pin nph to a specific version to avoid sudden style differences.
|
||||
# Updating nph version should be accompanied with running the new
|
||||
# version on the fluffy directory.
|
||||
run: |
|
||||
VERSION="v0.5.1"
|
||||
ARCHIVE="nph-linux_x64.tar.gz"
|
||||
curl -L "https://github.com/arnetheduck/nph/releases/download/${VERSION}/${ARCHIVE}" -o ${ARCHIVE}
|
||||
tar -xzf ${ARCHIVE}
|
||||
shopt -s extglob # Enable extended globbing
|
||||
./nph examples libp2p tests tools *.@(nim|nims|nimble)
|
||||
git diff --exit-code
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
name: nim-libp2p codecov builds
|
||||
name: Coverage
|
||||
|
||||
on:
|
||||
#On push to common branches, this computes the "bases stats" for PRs
|
||||
# On push to common branches, this computes the coverage that PRs will use for diff
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- unstable
|
||||
pull_request:
|
||||
merge_group:
|
||||
workflow_dispatch:
|
||||
|
||||
concurrency:
|
||||
@@ -14,12 +14,13 @@ concurrency:
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
Coverage:
|
||||
runs-on: ubuntu-20.04
|
||||
codecov:
|
||||
name: Run coverage and upload to codecov
|
||||
runs-on: ubuntu-22.04
|
||||
env:
|
||||
CICOV: YES
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
@@ -32,7 +33,7 @@ jobs:
|
||||
|
||||
- name: Restore deps from cache
|
||||
id: deps-cache
|
||||
uses: actions/cache@v3
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: nimbledeps
|
||||
key: nimbledeps-${{ hashFiles('.pinned') }}
|
||||
@@ -42,24 +43,28 @@ jobs:
|
||||
run: |
|
||||
nimble install_pinned
|
||||
|
||||
- name: Run
|
||||
- name: Setup coverage
|
||||
run: |
|
||||
sudo apt-get update
|
||||
sudo apt-get install -y lcov build-essential git curl
|
||||
mkdir coverage
|
||||
|
||||
- name: Run test suite with coverage flags
|
||||
run: |
|
||||
export NIMFLAGS="--lineDir:on --passC:-fprofile-arcs --passC:-ftest-coverage --passL:-fprofile-arcs --passL:-ftest-coverage"
|
||||
nimble testnative
|
||||
nimble testpubsub
|
||||
nimble testfilter
|
||||
|
||||
- name: Run coverage
|
||||
run: |
|
||||
find nimcache -name *.c -delete
|
||||
lcov --capture --directory nimcache --output-file coverage/coverage.info
|
||||
shopt -s globstar
|
||||
ls `pwd`/libp2p/{*,**/*}.nim
|
||||
lcov --extract coverage/coverage.info `pwd`/libp2p/{*,**/*}.nim --output-file coverage/coverage.f.info
|
||||
genhtml coverage/coverage.f.info --output-directory coverage/output
|
||||
bash <(curl -s https://codecov.io/bash) -f coverage/coverage.f.info || echo "Codecov did not collect coverage reports"
|
||||
|
||||
#- uses: actions/upload-artifact@master
|
||||
# with:
|
||||
# name: coverage
|
||||
# path: coverage
|
||||
- name: Upload coverage to codecov
|
||||
run: |
|
||||
bash <(curl -s https://codecov.io/bash) -f coverage/coverage.f.info || echo "Codecov did not collect coverage reports"
|
||||
12
.github/workflows/daily.yml
vendored
12
.github/workflows/daily.yml
vendored
@@ -1,12 +0,0 @@
|
||||
name: Daily
|
||||
on:
|
||||
schedule:
|
||||
- cron: "30 6 * * *"
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
call-multi-nim-common:
|
||||
uses: ./.github/workflows/daily_common.yml
|
||||
with:
|
||||
nim-branch: "['version-1-6','version-2-0']"
|
||||
cpu: "['amd64']"
|
||||
14
.github/workflows/daily_amd64.yml
vendored
Normal file
14
.github/workflows/daily_amd64.yml
vendored
Normal file
@@ -0,0 +1,14 @@
|
||||
name: Daily amd64
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "30 6 * * *"
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
test_amd64:
|
||||
name: Daily amd64
|
||||
uses: ./.github/workflows/daily_common.yml
|
||||
with:
|
||||
nim: "[{'ref': 'version-1-6', 'memory_management': 'refc'}, {'ref': '8754469f4947844c5938f56e1fba846c349354b5', 'memory_management': 'refc'}]"
|
||||
cpu: "['amd64']"
|
||||
59
.github/workflows/daily_common.yml
vendored
59
.github/workflows/daily_common.yml
vendored
@@ -1,12 +1,13 @@
|
||||
name: daily-common
|
||||
name: Daily Common
|
||||
# Serves as base workflow for daily tasks, it's not run by itself.
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
nim-branch:
|
||||
description: 'Nim branch'
|
||||
nim:
|
||||
description: 'Nim Configuration'
|
||||
required: true
|
||||
type: string
|
||||
type: string # Following this format: [{"ref": ..., "memory_management": ...}, ...]
|
||||
cpu:
|
||||
description: 'CPU'
|
||||
required: true
|
||||
@@ -16,30 +17,40 @@ on:
|
||||
required: false
|
||||
type: string
|
||||
default: "[]"
|
||||
use_sat_solver:
|
||||
description: 'Install dependencies with SAT Solver'
|
||||
required: false
|
||||
type: boolean
|
||||
default: false
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
delete-cache:
|
||||
delete_cache:
|
||||
name: Delete github action's branch cache
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: snnaplab/delete-branch-cache-action@v1
|
||||
|
||||
build:
|
||||
needs: delete-cache
|
||||
timeout-minutes: 120
|
||||
test:
|
||||
needs: delete_cache
|
||||
timeout-minutes: 90
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
platform:
|
||||
- os: linux
|
||||
builder: ubuntu-20
|
||||
builder: ubuntu-22.04
|
||||
shell: bash
|
||||
- os: macos
|
||||
builder: macos-12
|
||||
builder: macos-13
|
||||
shell: bash
|
||||
- os: windows
|
||||
builder: windows-2019
|
||||
builder: windows-2022
|
||||
shell: msys2 {0}
|
||||
branch: ${{ fromJSON(inputs.nim-branch) }}
|
||||
nim: ${{ fromJSON(inputs.nim) }}
|
||||
cpu: ${{ fromJSON(inputs.cpu) }}
|
||||
exclude: ${{ fromJSON(inputs.exclude) }}
|
||||
|
||||
@@ -47,9 +58,8 @@ jobs:
|
||||
run:
|
||||
shell: ${{ matrix.platform.shell }}
|
||||
|
||||
name: '${{ matrix.platform.os }}-${{ matrix.cpu }} (Nim ${{ matrix.branch }})'
|
||||
name: '${{ matrix.platform.os }}-${{ matrix.cpu }} (Nim ${{ matrix.nim.ref }})'
|
||||
runs-on: ${{ matrix.platform.builder }}
|
||||
continue-on-error: ${{ matrix.branch == 'devel' || matrix.branch == 'version-2-0' }}
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
@@ -59,11 +69,11 @@ jobs:
|
||||
with:
|
||||
os: ${{ matrix.platform.os }}
|
||||
shell: ${{ matrix.platform.shell }}
|
||||
nim_branch: ${{ matrix.branch }}
|
||||
nim_ref: ${{ matrix.nim.ref }}
|
||||
cpu: ${{ matrix.cpu }}
|
||||
|
||||
- name: Setup Go
|
||||
uses: actions/setup-go@v4
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: '~1.15.5'
|
||||
cache: false
|
||||
@@ -71,14 +81,21 @@ jobs:
|
||||
- name: Install p2pd
|
||||
run: |
|
||||
V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
nimble install -y --depsOnly
|
||||
|
||||
- name: Run tests
|
||||
run: |
|
||||
nim --version
|
||||
nimble --version
|
||||
nimble install -y --depsOnly
|
||||
NIMFLAGS="${NIMFLAGS} --mm:refc" nimble test
|
||||
if [[ "${{ matrix.branch }}" == "devel" ]]; then
|
||||
echo -e "\nTesting with '--mm:orc':\n"
|
||||
NIMFLAGS="${NIMFLAGS} --mm:orc" nimble test
|
||||
|
||||
if [[ "${{ inputs.use_sat_solver }}" == "true" ]]; then
|
||||
dependency_solver="sat"
|
||||
else
|
||||
dependency_solver="legacy"
|
||||
fi
|
||||
|
||||
NIMFLAGS="${NIMFLAGS} --mm:${{ matrix.nim.memory_management }} --solver:${dependency_solver}"
|
||||
nimble test
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
name: Daily Nim Devel
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "30 6 * * *"
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
call-multi-nim-common:
|
||||
test_nim_devel:
|
||||
name: Daily Nim Devel
|
||||
uses: ./.github/workflows/daily_common.yml
|
||||
with:
|
||||
nim-branch: "['devel']"
|
||||
nim: "[{'ref': 'devel', 'memory_management': 'orc'}]"
|
||||
cpu: "['amd64']"
|
||||
6
.github/workflows/daily_i386.yml
vendored
6
.github/workflows/daily_i386.yml
vendored
@@ -1,13 +1,15 @@
|
||||
name: Daily i386
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "30 6 * * *"
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
call-multi-nim-common:
|
||||
test_i386:
|
||||
name: Daily i386 (Linux)
|
||||
uses: ./.github/workflows/daily_common.yml
|
||||
with:
|
||||
nim-branch: "['version-1-6','version-2-0', 'devel']"
|
||||
nim: "[{'ref': 'version-1-6', 'memory_management': 'refc'}, {'ref': '8754469f4947844c5938f56e1fba846c349354b5', 'memory_management': 'refc'}, {'ref': 'devel', 'memory_management': 'orc'}]"
|
||||
cpu: "['i386']"
|
||||
exclude: "[{'platform': {'os':'macos'}}, {'platform': {'os':'windows'}}]"
|
||||
|
||||
15
.github/workflows/daily_sat.yml
vendored
Normal file
15
.github/workflows/daily_sat.yml
vendored
Normal file
@@ -0,0 +1,15 @@
|
||||
name: Daily SAT
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "30 6 * * *"
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
test_amd64:
|
||||
name: Daily SAT
|
||||
uses: ./.github/workflows/daily_common.yml
|
||||
with:
|
||||
nim: "[{'ref': '8754469f4947844c5938f56e1fba846c349354b5', 'memory_management': 'refc'}]"
|
||||
cpu: "['amd64']"
|
||||
use_sat_solver: true
|
||||
@@ -1,4 +1,5 @@
|
||||
name: Bumper
|
||||
name: Dependencies
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
@@ -6,34 +7,38 @@ on:
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
bumpProjects:
|
||||
bumper:
|
||||
# Pushes new refs to interested external repositories, so they can do early testing against libp2p's newer versions
|
||||
runs-on: ubuntu-latest
|
||||
name: Bump libp2p's version for ${{ matrix.target.repository }}:${{ matrix.target.ref }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
target: [
|
||||
{ repo: status-im/nimbus-eth2, branch: unstable },
|
||||
{ repo: waku-org/nwaku, branch: master },
|
||||
{ repo: codex-storage/nim-codex, branch: master }
|
||||
]
|
||||
target:
|
||||
- repository: status-im/nimbus-eth2
|
||||
ref: unstable
|
||||
- repository: waku-org/nwaku
|
||||
ref: master
|
||||
- repository: codex-storage/nim-codex
|
||||
ref: master
|
||||
steps:
|
||||
- name: Clone repo
|
||||
uses: actions/checkout@v2
|
||||
- name: Clone target repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: ${{ matrix.target.repo }}
|
||||
ref: ${{ matrix.target.branch }}
|
||||
repository: ${{ matrix.target.repository }}
|
||||
ref: ${{ matrix.target.ref}}
|
||||
path: nbc
|
||||
fetch-depth: 0
|
||||
token: ${{ secrets.ACTIONS_GITHUB_TOKEN }}
|
||||
token: ${{ secrets.ACTIONS_GITHUB_TOKEN }}
|
||||
|
||||
- name: Checkout this ref
|
||||
- name: Checkout this ref in target repository
|
||||
run: |
|
||||
cd nbc
|
||||
git submodule update --init vendor/nim-libp2p
|
||||
cd vendor/nim-libp2p
|
||||
git checkout $GITHUB_SHA
|
||||
|
||||
- name: Commit this bump
|
||||
- name: Push this ref to target repository
|
||||
run: |
|
||||
cd nbc
|
||||
git config --global user.email "${{ github.actor }}@users.noreply.github.com"
|
||||
@@ -42,3 +47,4 @@ jobs:
|
||||
git branch -D nim-libp2p-auto-bump-${GITHUB_REF##*/} || true
|
||||
git switch -c nim-libp2p-auto-bump-${GITHUB_REF##*/}
|
||||
git push -f origin nim-libp2p-auto-bump-${GITHUB_REF##*/}
|
||||
|
||||
@@ -1,21 +1,21 @@
|
||||
name: Docgen
|
||||
name: Documentation Generation And Publishing
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
workflow_dispatch:
|
||||
|
||||
|
||||
jobs:
|
||||
build:
|
||||
timeout-minutes: 20
|
||||
|
||||
name: 'Generate & upload documentation'
|
||||
runs-on: 'ubuntu-20.04'
|
||||
runs-on: ubuntu-latest
|
||||
continue-on-error: true
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
@@ -35,7 +35,7 @@ jobs:
|
||||
ls ${GITHUB_REF##*/}
|
||||
|
||||
- name: Clone the gh-pages branch
|
||||
uses: actions/checkout@v2
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: vacp2p/nim-libp2p
|
||||
ref: gh-pages
|
||||
@@ -66,7 +66,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- uses: actions/setup-python@v2
|
||||
with:
|
||||
@@ -80,7 +80,7 @@ jobs:
|
||||
run: pip install mkdocs-material && nimble -y website
|
||||
|
||||
- name: Clone the gh-pages branch
|
||||
uses: actions/checkout@v2
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: vacp2p/nim-libp2p
|
||||
ref: gh-pages
|
||||
21
.github/workflows/interop.yml
vendored
21
.github/workflows/interop.yml
vendored
@@ -1,9 +1,11 @@
|
||||
name: Interoperability Testing
|
||||
name: Interoperability Tests
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
merge_group:
|
||||
push:
|
||||
branches:
|
||||
- unstable
|
||||
- master
|
||||
workflow_dispatch:
|
||||
|
||||
concurrency:
|
||||
@@ -15,6 +17,13 @@ jobs:
|
||||
name: Run transport interoperability tests
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- name: Free Disk Space
|
||||
# For some reason we have space issues while running this action. Likely while building the image.
|
||||
# This action will free up some space to avoid the issue.
|
||||
uses: jlumbroso/free-disk-space@v1.3.1
|
||||
with:
|
||||
tool-cache: true
|
||||
|
||||
- uses: actions/checkout@v4
|
||||
- uses: docker/setup-buildx-action@v3
|
||||
- name: Build image
|
||||
@@ -24,6 +33,10 @@ jobs:
|
||||
with:
|
||||
test-filter: nim-libp2p-head
|
||||
extra-versions: ${{ github.workspace }}/tests/transport-interop/version.json
|
||||
s3-cache-bucket: ${{ vars.S3_LIBP2P_BUILD_CACHE_BUCKET_NAME }}
|
||||
s3-access-key-id: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_ACCESS_KEY_ID }}
|
||||
s3-secret-access-key: ${{ secrets.S3_LIBP2P_BUILD_CACHE_AWS_SECRET_ACCESS_KEY }}
|
||||
aws-region: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_REGION }}
|
||||
|
||||
run-hole-punching-interop:
|
||||
name: Run hole-punching interoperability tests
|
||||
@@ -38,3 +51,7 @@ jobs:
|
||||
with:
|
||||
test-filter: nim-libp2p-head
|
||||
extra-versions: ${{ github.workspace }}/tests/hole-punching-interop/version.json
|
||||
s3-cache-bucket: ${{ vars.S3_LIBP2P_BUILD_CACHE_BUCKET_NAME }}
|
||||
s3-access-key-id: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_ACCESS_KEY_ID }}
|
||||
s3-secret-access-key: ${{ secrets.S3_LIBP2P_BUILD_CACHE_AWS_SECRET_ACCESS_KEY }}
|
||||
aws-region: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_REGION }}
|
||||
|
||||
34
.github/workflows/linters.yml
vendored
Normal file
34
.github/workflows/linters.yml
vendored
Normal file
@@ -0,0 +1,34 @@
|
||||
name: Linters
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
merge_group:
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
nph:
|
||||
name: NPH
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 2 # In PR, has extra merge commit: ^1 = PR, ^2 = base
|
||||
|
||||
- name: Setup NPH
|
||||
# Pin nph to a specific version to avoid sudden style differences.
|
||||
# Updating nph version should be accompanied with running the new version on the fluffy directory.
|
||||
run: |
|
||||
VERSION="v0.5.1"
|
||||
ARCHIVE="nph-linux_x64.tar.gz"
|
||||
curl -L "https://github.com/arnetheduck/nph/releases/download/${VERSION}/${ARCHIVE}" -o ${ARCHIVE}
|
||||
tar -xzf ${ARCHIVE}
|
||||
|
||||
- name: Check style
|
||||
run: |
|
||||
shopt -s extglob # Enable extended globbing
|
||||
./nph examples libp2p tests tools *.@(nim|nims|nimble)
|
||||
git diff --exit-code
|
||||
6
.pinned
6
.pinned
@@ -1,12 +1,14 @@
|
||||
bearssl;https://github.com/status-im/nim-bearssl@#e4157639db180e52727712a47deaefcbbac6ec86
|
||||
bearssl;https://github.com/status-im/nim-bearssl@#667b40440a53a58e9f922e29e20818720c62d9ac
|
||||
chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a
|
||||
chronos;https://github.com/status-im/nim-chronos@#672db137b7cad9b384b8f4fb551fb6bbeaabfe1b
|
||||
chronos;https://github.com/status-im/nim-chronos@#c04576d829b8a0a1b12baaa8bc92037501b3a4a0
|
||||
dnsclient;https://github.com/ba0f3/dnsclient.nim@#23214235d4784d24aceed99bbfe153379ea557c8
|
||||
faststreams;https://github.com/status-im/nim-faststreams@#720fc5e5c8e428d9d0af618e1e27c44b42350309
|
||||
httputils;https://github.com/status-im/nim-http-utils@#3b491a40c60aad9e8d3407443f46f62511e63b18
|
||||
json_serialization;https://github.com/status-im/nim-json-serialization@#85b7ea093cb85ee4f433a617b97571bd709d30df
|
||||
metrics;https://github.com/status-im/nim-metrics@#6142e433fc8ea9b73379770a788017ac528d46ff
|
||||
ngtcp2;https://github.com/status-im/nim-ngtcp2@#6834f4756b6af58356ac9c4fef3d71db3c3ae5fe
|
||||
nimcrypto;https://github.com/cheatfate/nimcrypto@#1c8d6e3caf3abc572136ae9a1da81730c4eb4288
|
||||
quic;https://github.com/status-im/nim-quic.git@#ddcb31ffb74b5460ab37fd13547eca90594248bc
|
||||
results;https://github.com/arnetheduck/nim-results@#f3c666a272c69d70cb41e7245e7f6844797303ad
|
||||
secp256k1;https://github.com/status-im/nim-secp256k1@#7246d91c667f4cc3759fdd50339caa45a2ecd8be
|
||||
serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe54049950e352bb969aab97173b35
|
||||
|
||||
@@ -12,6 +12,9 @@ switch("warning", "LockLevel:off")
|
||||
switch("warningAsError", "UseBase:on")
|
||||
--styleCheck:
|
||||
error
|
||||
--mm:
|
||||
refc
|
||||
# reconsider when there's a version-2-2 branch worth testing with as we might switch to orc
|
||||
|
||||
# Avoid some rare stack corruption while using exceptions with a SEH-enabled
|
||||
# toolchain: https://github.com/status-im/nimbus-eth2/issues/3121
|
||||
|
||||
5
funding.json
Normal file
5
funding.json
Normal file
@@ -0,0 +1,5 @@
|
||||
{
|
||||
"opRetro": {
|
||||
"projectId": "0xc9561ba3e4eca5483b40f8b1a254a73c91fefe4f8aee32dc20c0d96dcf33fe80"
|
||||
}
|
||||
}
|
||||
@@ -52,6 +52,7 @@ else:
|
||||
stream/connection,
|
||||
transports/transport,
|
||||
transports/tcptransport,
|
||||
transports/quictransport,
|
||||
protocols/secure/noise,
|
||||
cid,
|
||||
multihash,
|
||||
|
||||
@@ -1,16 +1,17 @@
|
||||
mode = ScriptMode.Verbose
|
||||
|
||||
packageName = "libp2p"
|
||||
version = "1.4.0"
|
||||
version = "1.7.0"
|
||||
author = "Status Research & Development GmbH"
|
||||
description = "LibP2P implementation"
|
||||
license = "MIT"
|
||||
skipDirs = @["tests", "examples", "Nim", "tools", "scripts", "docs"]
|
||||
|
||||
requires "nim >= 1.6.0",
|
||||
"nimcrypto >= 0.4.1", "dnsclient >= 0.3.0 & < 0.4.0", "bearssl >= 0.1.4",
|
||||
"chronicles >= 0.10.2", "chronos >= 4.0.0", "metrics", "secp256k1", "stew#head",
|
||||
"websock", "unittest2"
|
||||
"nimcrypto >= 0.6.0 & < 0.7.0", "dnsclient >= 0.3.0 & < 0.4.0", "bearssl >= 0.2.5",
|
||||
"chronicles >= 0.10.2", "chronos >= 4.0.3", "metrics", "secp256k1", "stew#head",
|
||||
"websock", "unittest2",
|
||||
"https://github.com/status-im/nim-quic.git#ddcb31ffb74b5460ab37fd13547eca90594248bc"
|
||||
|
||||
let nimc = getEnv("NIMC", "nim") # Which nim compiler to use
|
||||
let lang = getEnv("NIMLANG", "c") # Which backend (c/cpp/js)
|
||||
@@ -19,8 +20,8 @@ let verbose = getEnv("V", "") notin ["", "0"]
|
||||
|
||||
let cfg =
|
||||
" --styleCheck:usages --styleCheck:error" &
|
||||
(if verbose: "" else: " --verbosity:0 --hints:off") &
|
||||
" --skipParentCfg --skipUserCfg -f" & " --threads:on --opt:speed"
|
||||
(if verbose: "" else: " --verbosity:0 --hints:off") & " --skipUserCfg -f" &
|
||||
" --threads:on --opt:speed"
|
||||
|
||||
import hashes, strutils
|
||||
|
||||
@@ -126,7 +127,7 @@ task examples_build, "Build the samples":
|
||||
buildSample("tutorial_5_discovery", true)
|
||||
exec "nimble install -y nimpng@#HEAD"
|
||||
# this is to fix broken build on 1.7.3, remove it when nimpng version 0.3.2 or later is released
|
||||
exec "nimble install -y nico"
|
||||
exec "nimble install -y nico@#af99dd60bf2b395038ece815ea1012330a80d6e6"
|
||||
buildSample("tutorial_6_game", false, "--styleCheck:off")
|
||||
|
||||
# pin system
|
||||
|
||||
@@ -136,7 +136,8 @@ proc triggerConnEvent*(c: ConnManager, peerId: PeerId, event: ConnEvent) {.async
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
warn "Exception in triggerConnEvents", msg = exc.msg, peer = peerId, event = $event
|
||||
warn "Exception in triggerConnEvents",
|
||||
description = exc.msg, peer = peerId, event = $event
|
||||
|
||||
proc addPeerEventHandler*(
|
||||
c: ConnManager, handler: PeerEventHandler, kind: PeerEventKind
|
||||
@@ -169,7 +170,7 @@ proc triggerPeerEvents*(c: ConnManager, peerId: PeerId, event: PeerEvent) {.asyn
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc: # handlers should not raise!
|
||||
warn "Exception in triggerPeerEvents", exc = exc.msg, peer = peerId
|
||||
warn "Exception in triggerPeerEvents", description = exc.msg, peer = peerId
|
||||
|
||||
proc expectConnection*(
|
||||
c: ConnManager, p: PeerId, dir: Direction
|
||||
@@ -212,7 +213,7 @@ proc closeMuxer(muxer: Muxer) {.async.} =
|
||||
try:
|
||||
await muxer.handler # TODO noraises?
|
||||
except CatchableError as exc:
|
||||
trace "Exception in close muxer handler", exc = exc.msg
|
||||
trace "Exception in close muxer handler", description = exc.msg
|
||||
trace "Cleaned up muxer", m = muxer
|
||||
|
||||
proc muxCleanup(c: ConnManager, mux: Muxer) {.async.} =
|
||||
@@ -235,7 +236,7 @@ proc muxCleanup(c: ConnManager, mux: Muxer) {.async.} =
|
||||
except CatchableError as exc:
|
||||
# This is top-level procedure which will work as separate task, so it
|
||||
# do not need to propagate CancelledError and should handle other errors
|
||||
warn "Unexpected exception peer cleanup handler", mux, msg = exc.msg
|
||||
warn "Unexpected exception peer cleanup handler", mux, description = exc.msg
|
||||
|
||||
proc onClose(c: ConnManager, mux: Muxer) {.async.} =
|
||||
## connection close even handler
|
||||
@@ -246,7 +247,8 @@ proc onClose(c: ConnManager, mux: Muxer) {.async.} =
|
||||
await mux.connection.join()
|
||||
trace "Connection closed, cleaning up", mux
|
||||
except CatchableError as exc:
|
||||
debug "Unexpected exception in connection manager's cleanup", errMsg = exc.msg, mux
|
||||
debug "Unexpected exception in connection manager's cleanup",
|
||||
description = exc.msg, mux
|
||||
finally:
|
||||
await c.muxCleanup(mux)
|
||||
|
||||
@@ -294,7 +296,8 @@ proc storeMuxer*(c: ConnManager, muxer: Muxer) {.raises: [CatchableError].} =
|
||||
if expectedConn != nil and not expectedConn.finished:
|
||||
expectedConn.complete(muxer)
|
||||
else:
|
||||
debug "Too many connections for peer", conns = c.muxed.getOrDefault(peerId).len
|
||||
debug "Too many connections for peer",
|
||||
conns = c.muxed.getOrDefault(peerId).len, peerId, dir
|
||||
|
||||
raise newTooManyConnectionsError()
|
||||
|
||||
@@ -358,7 +361,7 @@ proc trackConnection*(cs: ConnectionSlot, conn: Connection) =
|
||||
try:
|
||||
await conn.join()
|
||||
except CatchableError as exc:
|
||||
trace "Exception in semaphore monitor, ignoring", exc = exc.msg
|
||||
trace "Exception in semaphore monitor, ignoring", description = exc.msg
|
||||
|
||||
cs.release()
|
||||
|
||||
|
||||
@@ -198,7 +198,7 @@ proc random*(
|
||||
case scheme
|
||||
of PKScheme.RSA:
|
||||
when supported(PKScheme.RSA):
|
||||
let rsakey = ?RsaPrivateKey.random(rng, bits).orError(KeyError)
|
||||
let rsakey = ?RsaPrivateKey.random(rng, bits).orError(CryptoError.KeyError)
|
||||
ok(PrivateKey(scheme: scheme, rsakey: rsakey))
|
||||
else:
|
||||
err(SchemeError)
|
||||
@@ -210,7 +210,8 @@ proc random*(
|
||||
err(SchemeError)
|
||||
of PKScheme.ECDSA:
|
||||
when supported(PKScheme.ECDSA):
|
||||
let eckey = ?ecnist.EcPrivateKey.random(Secp256r1, rng).orError(KeyError)
|
||||
let eckey =
|
||||
?ecnist.EcPrivateKey.random(Secp256r1, rng).orError(CryptoError.KeyError)
|
||||
ok(PrivateKey(scheme: scheme, eckey: eckey))
|
||||
else:
|
||||
err(SchemeError)
|
||||
@@ -237,10 +238,11 @@ proc random*(
|
||||
let skkey = SkPrivateKey.random(rng)
|
||||
ok(PrivateKey(scheme: PKScheme.Secp256k1, skkey: skkey))
|
||||
elif supported(PKScheme.RSA):
|
||||
let rsakey = ?RsaPrivateKey.random(rng, bits).orError(KeyError)
|
||||
let rsakey = ?RsaPrivateKey.random(rng, bits).orError(CryptoError.KeyError)
|
||||
ok(PrivateKey(scheme: PKScheme.RSA, rsakey: rsakey))
|
||||
elif supported(PKScheme.ECDSA):
|
||||
let eckey = ?ecnist.EcPrivateKey.random(Secp256r1, rng).orError(KeyError)
|
||||
let eckey =
|
||||
?ecnist.EcPrivateKey.random(Secp256r1, rng).orError(CryptoError.KeyError)
|
||||
ok(PrivateKey(scheme: PKScheme.ECDSA, eckey: eckey))
|
||||
else:
|
||||
err(SchemeError)
|
||||
@@ -258,7 +260,7 @@ proc random*(
|
||||
case scheme
|
||||
of PKScheme.RSA:
|
||||
when supported(PKScheme.RSA):
|
||||
let pair = ?RsaKeyPair.random(rng, bits).orError(KeyError)
|
||||
let pair = ?RsaKeyPair.random(rng, bits).orError(CryptoError.KeyError)
|
||||
ok(
|
||||
KeyPair(
|
||||
seckey: PrivateKey(scheme: scheme, rsakey: pair.seckey),
|
||||
@@ -280,7 +282,7 @@ proc random*(
|
||||
err(SchemeError)
|
||||
of PKScheme.ECDSA:
|
||||
when supported(PKScheme.ECDSA):
|
||||
let pair = ?EcKeyPair.random(Secp256r1, rng).orError(KeyError)
|
||||
let pair = ?EcKeyPair.random(Secp256r1, rng).orError(CryptoError.KeyError)
|
||||
ok(
|
||||
KeyPair(
|
||||
seckey: PrivateKey(scheme: scheme, eckey: pair.seckey),
|
||||
@@ -583,7 +585,7 @@ proc init*(t: typedesc[PrivateKey], data: openArray[byte]): CryptoResult[Private
|
||||
## Create new private key from libp2p's protobuf serialized binary form.
|
||||
var res: t
|
||||
if not res.init(data):
|
||||
err(KeyError)
|
||||
err(CryptoError.KeyError)
|
||||
else:
|
||||
ok(res)
|
||||
|
||||
@@ -591,7 +593,7 @@ proc init*(t: typedesc[PublicKey], data: openArray[byte]): CryptoResult[PublicKe
|
||||
## Create new public key from libp2p's protobuf serialized binary form.
|
||||
var res: t
|
||||
if not res.init(data):
|
||||
err(KeyError)
|
||||
err(CryptoError.KeyError)
|
||||
else:
|
||||
ok(res)
|
||||
|
||||
|
||||
@@ -75,7 +75,9 @@ proc public*(private: Curve25519Key): Curve25519Key =
|
||||
proc random*(_: type[Curve25519Key], rng: var HmacDrbgContext): Curve25519Key =
|
||||
var res: Curve25519Key
|
||||
let defaultBrEc = ecGetDefault()
|
||||
let len = ecKeygen(addr rng.vtable, defaultBrEc, nil, addr res[0], EC_curve25519)
|
||||
let len = ecKeygen(
|
||||
PrngClassPointerConst(addr rng.vtable), defaultBrEc, nil, addr res[0], EC_curve25519
|
||||
)
|
||||
# Per bearssl documentation, the keygen only fails if the curve is
|
||||
# unrecognised -
|
||||
doAssert len == Curve25519KeySize, "Could not generate curve"
|
||||
|
||||
@@ -234,7 +234,11 @@ proc random*(
|
||||
var ecimp = ecGetDefault()
|
||||
var res = new EcPrivateKey
|
||||
if ecKeygen(
|
||||
addr rng.vtable, ecimp, addr res.key, addr res.buffer[0], safeConvert[cint](kind)
|
||||
PrngClassPointerConst(addr rng.vtable),
|
||||
ecimp,
|
||||
addr res.key,
|
||||
addr res.buffer[0],
|
||||
safeConvert[cint](kind),
|
||||
) == 0:
|
||||
err(EcKeyGenError)
|
||||
else:
|
||||
|
||||
@@ -62,10 +62,12 @@ proc dialAndUpgrade(
|
||||
libp2p_total_dial_attempts.inc()
|
||||
await transport.dial(hostname, address, peerId)
|
||||
except CancelledError as exc:
|
||||
debug "Dialing canceled", err = exc.msg, peerId = peerId.get(default(PeerId))
|
||||
trace "Dialing canceled",
|
||||
description = exc.msg, peerId = peerId.get(default(PeerId))
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "Dialing failed", err = exc.msg, peerId = peerId.get(default(PeerId))
|
||||
debug "Dialing failed",
|
||||
description = exc.msg, peerId = peerId.get(default(PeerId))
|
||||
libp2p_failed_dials.inc()
|
||||
return nil # Try the next address
|
||||
|
||||
@@ -87,7 +89,7 @@ proc dialAndUpgrade(
|
||||
# we won't succeeded through another - no use in trying again
|
||||
await dialed.close()
|
||||
debug "Connection upgrade failed",
|
||||
err = exc.msg, peerId = peerId.get(default(PeerId))
|
||||
description = exc.msg, peerId = peerId.get(default(PeerId))
|
||||
if dialed.dir == Direction.Out:
|
||||
libp2p_failed_upgrades_outgoing.inc()
|
||||
else:
|
||||
@@ -200,7 +202,7 @@ proc internalConnect(
|
||||
PeerEvent(kind: PeerEventKind.Identified, initiator: true),
|
||||
)
|
||||
except CatchableError as exc:
|
||||
trace "Failed to finish outgoung upgrade", err = exc.msg
|
||||
trace "Failed to finish outgoung upgrade", description = exc.msg
|
||||
await muxed.close()
|
||||
raise exc
|
||||
|
||||
@@ -327,7 +329,7 @@ method dial*(
|
||||
await cleanup()
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "Error dialing", conn, err = exc.msg
|
||||
debug "Error dialing", conn, description = exc.msg
|
||||
await cleanup()
|
||||
raise exc
|
||||
|
||||
|
||||
@@ -64,7 +64,7 @@ method advertise*(self: RendezVousInterface) {.async.} =
|
||||
try:
|
||||
await self.rdv.advertise(toAdv, self.ttl)
|
||||
except CatchableError as error:
|
||||
debug "RendezVous advertise error: ", msg = error.msg
|
||||
debug "RendezVous advertise error: ", description = error.msg
|
||||
|
||||
await sleepAsync(self.timeToAdvertise) or self.advertisementUpdated.wait()
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ macro checkFutures*[F](futs: seq[F], exclude: untyped = []): untyped =
|
||||
# We still don't abort but warn
|
||||
debug "A future has failed, enable trace logging for details",
|
||||
error = exc.name
|
||||
trace "Exception message", msg = exc.msg, stack = getStackTrace()
|
||||
trace "Exception message", description = exc.msg, stack = getStackTrace()
|
||||
else:
|
||||
quote:
|
||||
for res in `futs`:
|
||||
@@ -40,9 +40,9 @@ macro checkFutures*[F](futs: seq[F], exclude: untyped = []): untyped =
|
||||
let exc = res.readError()
|
||||
for i in 0 ..< `nexclude`:
|
||||
if exc of `exclude`[i]:
|
||||
trace "A future has failed", error = exc.name, msg = exc.msg
|
||||
trace "A future has failed", error = exc.name, description = exc.msg
|
||||
break check
|
||||
# We still don't abort but warn
|
||||
debug "A future has failed, enable trace logging for details",
|
||||
error = exc.name
|
||||
trace "Exception details", msg = exc.msg
|
||||
trace "Exception details", description = exc.msg
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
{.push raises: [].}
|
||||
{.push public.}
|
||||
|
||||
import pkg/chronos, chronicles
|
||||
import pkg/[chronos, chronicles, results]
|
||||
import std/[nativesockets, net, hashes]
|
||||
import tables, strutils, sets
|
||||
import
|
||||
@@ -25,8 +25,8 @@ import
|
||||
protobuf/minprotobuf,
|
||||
errors,
|
||||
utility
|
||||
import stew/[base58, base32, endians2, results]
|
||||
export results, minprotobuf, vbuffer, utility
|
||||
import stew/[base58, base32, endians2]
|
||||
export results, minprotobuf, vbuffer, errors, utility
|
||||
|
||||
logScope:
|
||||
topics = "libp2p multiaddress"
|
||||
@@ -71,6 +71,9 @@ type
|
||||
tcpProtocol
|
||||
udpProtocol
|
||||
|
||||
func maErr*(msg: string): ref MaError =
|
||||
(ref MaError)(msg: msg)
|
||||
|
||||
const
|
||||
# These are needed in order to avoid an ambiguity error stemming from
|
||||
# some cint constants with the same name defined in the posix modules
|
||||
@@ -408,7 +411,12 @@ const
|
||||
UDP_IP* = mapAnd(IP, mapEq("udp"))
|
||||
UDP* = mapOr(UDP_DNS, UDP_IP)
|
||||
UTP* = mapAnd(UDP, mapEq("utp"))
|
||||
QUIC* = mapAnd(UDP, mapEq("quic"))
|
||||
QUIC_IP* = mapAnd(UDP_IP, mapEq("quic"))
|
||||
QUIC_DNS* = mapAnd(UDP_DNS, mapEq("quic"))
|
||||
QUIC* = mapOr(QUIC_DNS, QUIC_IP)
|
||||
QUIC_V1_IP* = mapAnd(UDP_IP, mapEq("quic-v1"))
|
||||
QUIC_V1_DNS* = mapAnd(UDP_DNS, mapEq("quic-v1"))
|
||||
QUIC_V1* = mapOr(QUIC_V1_DNS, QUIC_V1_IP)
|
||||
UNIX* = mapEq("unix")
|
||||
WS_DNS* = mapAnd(TCP_DNS, mapEq("ws"))
|
||||
WS_IP* = mapAnd(TCP_IP, mapEq("ws"))
|
||||
@@ -970,23 +978,21 @@ proc append*(m1: var MultiAddress, m2: MultiAddress): MaResult[void] =
|
||||
else:
|
||||
ok()
|
||||
|
||||
proc `&`*(m1, m2: MultiAddress): MultiAddress {.raises: [LPError].} =
|
||||
proc `&`*(m1, m2: MultiAddress): MultiAddress {.raises: [MaError].} =
|
||||
## Concatenates two addresses ``m1`` and ``m2``, and returns result.
|
||||
##
|
||||
## This procedure performs validation of concatenated result and can raise
|
||||
## exception on error.
|
||||
##
|
||||
concat(m1, m2).valueOr:
|
||||
raise maErr error
|
||||
|
||||
concat(m1, m2).tryGet()
|
||||
|
||||
proc `&=`*(m1: var MultiAddress, m2: MultiAddress) {.raises: [LPError].} =
|
||||
proc `&=`*(m1: var MultiAddress, m2: MultiAddress) {.raises: [MaError].} =
|
||||
## Concatenates two addresses ``m1`` and ``m2``.
|
||||
##
|
||||
## This procedure performs validation of concatenated result and can raise
|
||||
## exception on error.
|
||||
##
|
||||
|
||||
m1.append(m2).tryGet()
|
||||
m1.append(m2).isOkOr:
|
||||
raise maErr error
|
||||
|
||||
proc `==`*(m1: var MultiAddress, m2: MultiAddress): bool =
|
||||
## Check of two MultiAddress are equal
|
||||
|
||||
@@ -212,7 +212,7 @@ proc handle*(
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "Exception in multistream", conn, msg = exc.msg
|
||||
trace "Exception in multistream", conn, description = exc.msg
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
|
||||
@@ -116,7 +116,7 @@ proc reset*(s: LPChannel) {.async: (raises: []).} =
|
||||
trace "sending reset message", s, conn = s.conn
|
||||
await noCancel s.conn.writeMsg(s.id, s.resetCode) # write reset
|
||||
except LPStreamError as exc:
|
||||
trace "Can't send reset message", s, conn = s.conn, msg = exc.msg
|
||||
trace "Can't send reset message", s, conn = s.conn, description = exc.msg
|
||||
await s.conn.close()
|
||||
|
||||
asyncSpawn resetMessage()
|
||||
@@ -145,7 +145,7 @@ method close*(s: LPChannel) {.async: (raises: []).} =
|
||||
# It's harmless that close message cannot be sent - the connection is
|
||||
# likely down already
|
||||
await s.conn.close()
|
||||
trace "Cannot send close message", s, id = s.id, msg = exc.msg
|
||||
trace "Cannot send close message", s, id = s.id, description = exc.msg
|
||||
|
||||
await s.closeUnderlying() # maybe already eofed
|
||||
|
||||
@@ -256,7 +256,7 @@ proc completeWrite(
|
||||
except LPStreamEOFError as exc:
|
||||
raise exc
|
||||
except LPStreamError as exc:
|
||||
trace "exception in lpchannel write handler", s, msg = exc.msg
|
||||
trace "exception in lpchannel write handler", s, description = exc.msg
|
||||
await s.reset()
|
||||
await s.conn.close()
|
||||
raise newLPStreamConnDownError(exc)
|
||||
|
||||
@@ -70,7 +70,7 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async: (raises: []), inline.} =
|
||||
labelValues = [$chann.initiator, $m.connection.peerId],
|
||||
)
|
||||
except CancelledError as exc:
|
||||
warn "Error cleaning up mplex channel", m, chann, msg = exc.msg
|
||||
warn "Error cleaning up mplex channel", m, chann, description = exc.msg
|
||||
|
||||
proc newStreamInternal*(
|
||||
m: Mplex,
|
||||
@@ -175,7 +175,7 @@ method handle*(m: Mplex) {.async: (raises: []).} =
|
||||
except LPStreamClosedError as exc:
|
||||
# Channel is being closed, but `cleanupChann` was not yet triggered.
|
||||
trace "pushing data to channel failed",
|
||||
m, channel, len = data.len, msg = exc.msg
|
||||
m, channel, len = data.len, description = exc.msg
|
||||
discard # Ignore message, same as if `cleanupChann` had completed.
|
||||
of MessageType.CloseIn, MessageType.CloseOut:
|
||||
await channel.pushEof()
|
||||
@@ -185,11 +185,11 @@ method handle*(m: Mplex) {.async: (raises: []).} =
|
||||
except CancelledError:
|
||||
debug "Unexpected cancellation in mplex handler", m
|
||||
except LPStreamEOFError as exc:
|
||||
trace "Stream EOF", m, msg = exc.msg
|
||||
trace "Stream EOF", m, description = exc.msg
|
||||
except LPStreamError as exc:
|
||||
debug "Unexpected stream exception in mplex read loop", m, msg = exc.msg
|
||||
debug "Unexpected stream exception in mplex read loop", m, description = exc.msg
|
||||
except MuxerError as exc:
|
||||
debug "Unexpected muxer exception in mplex read loop", m, msg = exc.msg
|
||||
debug "Unexpected muxer exception in mplex read loop", m, description = exc.msg
|
||||
finally:
|
||||
await m.close()
|
||||
trace "Stopped mplex handler", m
|
||||
|
||||
@@ -279,10 +279,15 @@ method readOnce*(
|
||||
raise newLPStreamRemoteClosedError()
|
||||
if channel.recvQueue.len == 0:
|
||||
channel.receivedData.clear()
|
||||
try: # https://github.com/status-im/nim-chronos/issues/516
|
||||
discard await race(channel.closedRemotely.wait(), channel.receivedData.wait())
|
||||
except ValueError:
|
||||
raiseAssert("Futures list is not empty")
|
||||
let
|
||||
closedRemotelyFut = channel.closedRemotely.wait()
|
||||
receivedDataFut = channel.receivedData.wait()
|
||||
defer:
|
||||
if not closedRemotelyFut.finished():
|
||||
await closedRemotelyFut.cancelAndWait()
|
||||
if not receivedDataFut.finished():
|
||||
await receivedDataFut.cancelAndWait()
|
||||
await closedRemotelyFut or receivedDataFut
|
||||
if channel.closedRemotely.isSet() and channel.recvQueue.len == 0:
|
||||
channel.isEof = true
|
||||
return
|
||||
@@ -508,9 +513,9 @@ method close*(m: Yamux) {.async: (raises: []).} =
|
||||
try:
|
||||
await m.connection.write(YamuxHeader.goAway(NormalTermination))
|
||||
except CancelledError as exc:
|
||||
trace "cancelled sending goAway", msg = exc.msg
|
||||
trace "cancelled sending goAway", description = exc.msg
|
||||
except LPStreamError as exc:
|
||||
trace "failed to send goAway", msg = exc.msg
|
||||
trace "failed to send goAway", description = exc.msg
|
||||
await m.connection.close()
|
||||
trace "Closed yamux"
|
||||
|
||||
@@ -596,7 +601,7 @@ method handle*(m: Yamux) {.async: (raises: []).} =
|
||||
if header.length > 0:
|
||||
var buffer = newSeqUninitialized[byte](header.length)
|
||||
await m.connection.readExactly(addr buffer[0], int(header.length))
|
||||
trace "Msg Rcv", msg = shortLog(buffer)
|
||||
trace "Msg Rcv", description = shortLog(buffer)
|
||||
await channel.gotDataFromRemote(buffer)
|
||||
|
||||
if MsgFlags.Fin in header.flags:
|
||||
@@ -606,19 +611,19 @@ method handle*(m: Yamux) {.async: (raises: []).} =
|
||||
trace "remote reset channel"
|
||||
await channel.reset()
|
||||
except CancelledError as exc:
|
||||
debug "Unexpected cancellation in yamux handler", msg = exc.msg
|
||||
debug "Unexpected cancellation in yamux handler", description = exc.msg
|
||||
except LPStreamEOFError as exc:
|
||||
trace "Stream EOF", msg = exc.msg
|
||||
trace "Stream EOF", description = exc.msg
|
||||
except LPStreamError as exc:
|
||||
debug "Unexpected stream exception in yamux read loop", msg = exc.msg
|
||||
debug "Unexpected stream exception in yamux read loop", description = exc.msg
|
||||
except YamuxError as exc:
|
||||
trace "Closing yamux connection", error = exc.msg
|
||||
trace "Closing yamux connection", description = exc.msg
|
||||
try:
|
||||
await m.connection.write(YamuxHeader.goAway(ProtocolError))
|
||||
except CancelledError, LPStreamError:
|
||||
discard
|
||||
except MuxerError as exc:
|
||||
debug "Unexpected muxer exception in yamux read loop", msg = exc.msg
|
||||
debug "Unexpected muxer exception in yamux read loop", description = exc.msg
|
||||
try:
|
||||
await m.connection.write(YamuxHeader.goAway(ProtocolError))
|
||||
except CancelledError, LPStreamError:
|
||||
|
||||
@@ -41,7 +41,7 @@ proc questionToBuf(address: string, kind: QKind): seq[byte] =
|
||||
discard requestStream.readData(addr buf[0], dataLen)
|
||||
return buf
|
||||
except CatchableError as exc:
|
||||
info "Failed to created DNS buffer", msg = exc.msg
|
||||
info "Failed to created DNS buffer", description = exc.msg
|
||||
return newSeq[byte](0)
|
||||
|
||||
proc getDnsResponse(
|
||||
|
||||
@@ -84,13 +84,13 @@ proc tryDial(autonat: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.asy
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except AllFuturesFailedError as exc:
|
||||
debug "All dial attempts failed", addrs, exc = exc.msg
|
||||
debug "All dial attempts failed", addrs, description = exc.msg
|
||||
await conn.sendResponseError(DialError, "All dial attempts failed")
|
||||
except AsyncTimeoutError as exc:
|
||||
debug "Dial timeout", addrs, exc = exc.msg
|
||||
debug "Dial timeout", addrs, description = exc.msg
|
||||
await conn.sendResponseError(DialError, "Dial timeout")
|
||||
except CatchableError as exc:
|
||||
debug "Unexpected error", addrs, exc = exc.msg
|
||||
debug "Unexpected error", addrs, description = exc.msg
|
||||
await conn.sendResponseError(DialError, "Unexpected error")
|
||||
finally:
|
||||
autonat.sem.release()
|
||||
@@ -165,7 +165,7 @@ proc new*(
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "exception in autonat handler", exc = exc.msg, conn
|
||||
debug "exception in autonat handler", description = exc.msg, conn
|
||||
finally:
|
||||
trace "exiting autonat handler", conn
|
||||
await conn.close()
|
||||
|
||||
@@ -146,13 +146,13 @@ proc askPeer(
|
||||
debug "dialMe answer is reachable"
|
||||
Reachable
|
||||
except AutonatUnreachableError as error:
|
||||
debug "dialMe answer is not reachable", msg = error.msg
|
||||
debug "dialMe answer is not reachable", description = error.msg
|
||||
NotReachable
|
||||
except AsyncTimeoutError as error:
|
||||
debug "dialMe timed out", msg = error.msg
|
||||
debug "dialMe timed out", description = error.msg
|
||||
Unknown
|
||||
except CatchableError as error:
|
||||
debug "dialMe unexpected error", msg = error.msg
|
||||
debug "dialMe unexpected error", description = error.msg
|
||||
Unknown
|
||||
let hasReachabilityOrConfidenceChanged = await self.handleAnswer(ans)
|
||||
if hasReachabilityOrConfidenceChanged:
|
||||
@@ -194,7 +194,7 @@ proc addressMapper(
|
||||
processedMA = peerStore.guessDialableAddr(listenAddr)
|
||||
# handle manual port forwarding
|
||||
except CatchableError as exc:
|
||||
debug "Error while handling address mapper", msg = exc.msg
|
||||
debug "Error while handling address mapper", description = exc.msg
|
||||
addrs.add(processedMA)
|
||||
return addrs
|
||||
|
||||
|
||||
@@ -88,7 +88,7 @@ proc startSync*(
|
||||
raise err
|
||||
except AllFuturesFailedError as err:
|
||||
debug "Dcutr initiator could not connect to the remote peer, all connect attempts failed",
|
||||
peerDialableAddrs, msg = err.msg
|
||||
peerDialableAddrs, description = err.msg
|
||||
raise newException(
|
||||
DcutrError,
|
||||
"Dcutr initiator could not connect to the remote peer, all connect attempts failed",
|
||||
@@ -96,7 +96,7 @@ proc startSync*(
|
||||
)
|
||||
except AsyncTimeoutError as err:
|
||||
debug "Dcutr initiator could not connect to the remote peer, all connect attempts timed out",
|
||||
peerDialableAddrs, msg = err.msg
|
||||
peerDialableAddrs, description = err.msg
|
||||
raise newException(
|
||||
DcutrError,
|
||||
"Dcutr initiator could not connect to the remote peer, all connect attempts timed out",
|
||||
@@ -104,7 +104,7 @@ proc startSync*(
|
||||
)
|
||||
except CatchableError as err:
|
||||
debug "Unexpected error when Dcutr initiator tried to connect to the remote peer",
|
||||
err = err.msg
|
||||
description = err.msg
|
||||
raise newException(
|
||||
DcutrError,
|
||||
"Unexpected error when Dcutr initiator tried to connect to the remote peer", err,
|
||||
|
||||
@@ -80,13 +80,13 @@ proc new*(
|
||||
raise err
|
||||
except AllFuturesFailedError as err:
|
||||
debug "Dcutr receiver could not connect to the remote peer, " &
|
||||
"all connect attempts failed", peerDialableAddrs, msg = err.msg
|
||||
"all connect attempts failed", peerDialableAddrs, description = err.msg
|
||||
except AsyncTimeoutError as err:
|
||||
debug "Dcutr receiver could not connect to the remote peer, " &
|
||||
"all connect attempts timed out", peerDialableAddrs, msg = err.msg
|
||||
"all connect attempts timed out", peerDialableAddrs, description = err.msg
|
||||
except CatchableError as err:
|
||||
warn "Unexpected error when Dcutr receiver tried to connect " &
|
||||
"to the remote peer", msg = err.msg
|
||||
"to the remote peer", description = err.msg
|
||||
|
||||
let self = T()
|
||||
self.handler = handleStream
|
||||
|
||||
@@ -93,7 +93,7 @@ proc reserve*(
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "error writing or reading reservation message", exc = exc.msg
|
||||
trace "error writing or reading reservation message", description = exc.msg
|
||||
raise newException(ReservationError, exc.msg)
|
||||
|
||||
if msg.msgType != HopMessageType.Status:
|
||||
@@ -139,7 +139,7 @@ proc dialPeerV1*(
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "error writing hop request", exc = exc.msg
|
||||
trace "error writing hop request", description = exc.msg
|
||||
raise exc
|
||||
|
||||
let msgRcvFromRelayOpt =
|
||||
@@ -148,7 +148,7 @@ proc dialPeerV1*(
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "error reading stop response", exc = exc.msg
|
||||
trace "error reading stop response", description = exc.msg
|
||||
await sendStatus(conn, StatusV1.HopCantOpenDstStream)
|
||||
raise exc
|
||||
|
||||
@@ -190,13 +190,13 @@ proc dialPeerV2*(
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "error reading stop response", exc = exc.msg
|
||||
trace "error reading stop response", description = exc.msg
|
||||
raise newException(RelayV2DialError, exc.msg)
|
||||
|
||||
if msgRcvFromRelay.msgType != HopMessageType.Status:
|
||||
raise newException(RelayV2DialError, "Unexpected stop response")
|
||||
if msgRcvFromRelay.status.get(UnexpectedMessage) != Ok:
|
||||
trace "Relay stop failed", msg = msgRcvFromRelay.status
|
||||
trace "Relay stop failed", description = msgRcvFromRelay.status
|
||||
raise newException(RelayV2DialError, "Relay stop failure")
|
||||
conn.limitDuration = msgRcvFromRelay.limit.duration
|
||||
conn.limitData = msgRcvFromRelay.limit.data
|
||||
@@ -302,7 +302,7 @@ proc new*(
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception in client handler", exc = exc.msg, conn
|
||||
trace "exception in client handler", description = exc.msg, conn
|
||||
finally:
|
||||
trace "exiting client handler", conn
|
||||
await conn.close()
|
||||
|
||||
@@ -167,7 +167,7 @@ proc handleConnect(r: Relay, connSrc: Connection, msg: HopMessage) {.async.} =
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "error opening relay stream", dst, exc = exc.msg
|
||||
trace "error opening relay stream", dst, description = exc.msg
|
||||
await sendHopStatus(connSrc, ConnectionFailed)
|
||||
return
|
||||
defer:
|
||||
@@ -196,7 +196,7 @@ proc handleConnect(r: Relay, connSrc: Connection, msg: HopMessage) {.async.} =
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "error sending stop message", msg = exc.msg
|
||||
trace "error sending stop message", description = exc.msg
|
||||
await sendHopStatus(connSrc, ConnectionFailed)
|
||||
return
|
||||
|
||||
@@ -213,7 +213,7 @@ proc handleHopStreamV2*(r: Relay, conn: Connection) {.async.} =
|
||||
let msg = HopMessage.decode(await conn.readLp(r.msgSize)).valueOr:
|
||||
await sendHopStatus(conn, MalformedMessage)
|
||||
return
|
||||
trace "relayv2 handle stream", msg = msg
|
||||
trace "relayv2 handle stream", hopMsg = msg
|
||||
case msg.msgType
|
||||
of HopMessageType.Reserve:
|
||||
await r.handleReserve(conn)
|
||||
@@ -272,7 +272,7 @@ proc handleHop*(r: Relay, connSrc: Connection, msg: RelayMessage) {.async.} =
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "error opening relay stream", dst, exc = exc.msg
|
||||
trace "error opening relay stream", dst, description = exc.msg
|
||||
await sendStatus(connSrc, StatusV1.HopCantDialDst)
|
||||
return
|
||||
defer:
|
||||
@@ -289,12 +289,13 @@ proc handleHop*(r: Relay, connSrc: Connection, msg: RelayMessage) {.async.} =
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "error writing stop handshake or reading stop response", exc = exc.msg
|
||||
trace "error writing stop handshake or reading stop response",
|
||||
description = exc.msg
|
||||
await sendStatus(connSrc, StatusV1.HopCantOpenDstStream)
|
||||
return
|
||||
|
||||
let msgRcvFromDst = msgRcvFromDstOpt.valueOr:
|
||||
trace "error reading stop response", msg = msgRcvFromDstOpt
|
||||
trace "error reading stop response", response = msgRcvFromDstOpt
|
||||
await sendStatus(connSrc, StatusV1.HopCantOpenDstStream)
|
||||
return
|
||||
|
||||
@@ -369,7 +370,7 @@ proc new*(
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "exception in relayv2 handler", exc = exc.msg, conn
|
||||
debug "exception in relayv2 handler", description = exc.msg, conn
|
||||
finally:
|
||||
trace "exiting relayv2 handler", conn
|
||||
await conn.close()
|
||||
|
||||
@@ -87,7 +87,7 @@ proc bridge*(
|
||||
trace "relay src closed connection", src = connSrc.peerId
|
||||
if connDst.closed() or connDst.atEof():
|
||||
trace "relay dst closed connection", dst = connDst.peerId
|
||||
trace "relay error", exc = exc.msg
|
||||
trace "relay error", description = exc.msg
|
||||
trace "end relaying", bytesSentFromSrcToDst, bytesSentFromDstToSrc
|
||||
await futSrc.cancelAndWait()
|
||||
await futDst.cancelAndWait()
|
||||
|
||||
@@ -156,7 +156,7 @@ method init*(p: Identify) =
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception in identify handler", exc = exc.msg, conn
|
||||
trace "exception in identify handler", description = exc.msg, conn
|
||||
finally:
|
||||
trace "exiting identify handler", conn
|
||||
await conn.closeWithEOF()
|
||||
@@ -226,7 +226,7 @@ proc init*(p: IdentifyPush) =
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
info "exception in identify push handler", exc = exc.msg, conn
|
||||
info "exception in identify push handler", description = exc.msg, conn
|
||||
finally:
|
||||
trace "exiting identify push handler", conn
|
||||
await conn.closeWithEOF()
|
||||
|
||||
@@ -49,7 +49,7 @@ proc new*(T: typedesc[Perf]): T {.public.} =
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception in perf handler", exc = exc.msg, conn
|
||||
trace "exception in perf handler", description = exc.msg, conn
|
||||
await conn.close()
|
||||
|
||||
p.handler = handle
|
||||
|
||||
@@ -63,7 +63,7 @@ method init*(p: Ping) =
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception in ping handler", exc = exc.msg, conn
|
||||
trace "exception in ping handler", description = exc.msg, conn
|
||||
|
||||
p.handler = handle
|
||||
p.codec = PingCodec
|
||||
|
||||
@@ -106,7 +106,7 @@ method rpcHandler*(f: FloodSub, peer: PubSubPeer, data: seq[byte]) {.async.} =
|
||||
debug "failed to decode msg from peer", peer, err = error
|
||||
raise newException(CatchableError, "Peer msg couldn't be decoded")
|
||||
|
||||
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
|
||||
trace "decoded msg from peer", peer, payload = rpcMsg.shortLog
|
||||
# trigger hooks
|
||||
peer.recvObservers(rpcMsg)
|
||||
|
||||
@@ -187,7 +187,7 @@ method init*(f: FloodSub) =
|
||||
# do not need to propagate CancelledError.
|
||||
trace "Unexpected cancellation in floodsub handler", conn
|
||||
except CatchableError as exc:
|
||||
trace "FloodSub handler leaks an error", exc = exc.msg, conn
|
||||
trace "FloodSub handler leaks an error", description = exc.msg, conn
|
||||
|
||||
f.handler = handler
|
||||
f.codec = FloodSubCodec
|
||||
@@ -219,7 +219,7 @@ method publish*(f: FloodSub, topic: string, data: seq[byte]): Future[int] {.asyn
|
||||
trace "Error generating message id, skipping publish", error = error
|
||||
return 0
|
||||
|
||||
trace "Created new message", msg = shortLog(msg), peers = peers.len, topic, msgId
|
||||
trace "Created new message", payload = shortLog(msg), peers = peers.len, topic, msgId
|
||||
|
||||
if f.addSeen(f.salt(msgId)):
|
||||
# custom msgid providers might cause this
|
||||
|
||||
@@ -220,7 +220,7 @@ method init*(g: GossipSub) =
|
||||
# do not need to propogate CancelledError.
|
||||
trace "Unexpected cancellation in gossipsub handler", conn
|
||||
except CatchableError as exc:
|
||||
trace "GossipSub handler leaks an error", exc = exc.msg, conn
|
||||
trace "GossipSub handler leaks an error", description = exc.msg, conn
|
||||
|
||||
g.handler = handler
|
||||
g.codecs &= GossipSubCodec_12
|
||||
@@ -368,7 +368,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||
else:
|
||||
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
|
||||
|
||||
trace "sending control message", msg = shortLog(respControl), peer
|
||||
trace "sending control message", payload = shortLog(respControl), peer
|
||||
g.send(peer, RPCMsg(control: some(respControl)), isHighPriority = true)
|
||||
|
||||
if messages.len > 0:
|
||||
@@ -453,6 +453,9 @@ proc validateAndRelay(
|
||||
|
||||
g.rewardDelivered(peer, topic, true)
|
||||
|
||||
# trigger hooks
|
||||
peer.validatedObservers(msg, msgId)
|
||||
|
||||
# The send list typically matches the idontwant list from above, but
|
||||
# might differ if validation takes time
|
||||
var toSendPeers = HashSet[PubSubPeer]()
|
||||
@@ -488,7 +491,7 @@ proc validateAndRelay(
|
||||
|
||||
await handleData(g, topic, msg.data)
|
||||
except CatchableError as exc:
|
||||
info "validateAndRelay failed", msg = exc.msg
|
||||
info "validateAndRelay failed", description = exc.msg
|
||||
|
||||
proc dataAndTopicsIdSize(msgs: seq[Message]): int =
|
||||
msgs.mapIt(it.data.len + it.topic.len).foldl(a + b, 0)
|
||||
@@ -537,7 +540,7 @@ method rpcHandler*(g: GossipSub, peer: PubSubPeer, data: seq[byte]) {.async.} =
|
||||
for m in rpcMsg.messages:
|
||||
libp2p_pubsub_received_messages.inc(labelValues = [$peer.peerId, m.topic])
|
||||
|
||||
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
|
||||
trace "decoded msg from peer", peer, payload = rpcMsg.shortLog
|
||||
await rateLimit(g, peer, g.messageOverhead(rpcMsg, msgSize))
|
||||
|
||||
# trigger hooks - these may modify the message
|
||||
@@ -768,7 +771,7 @@ method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.asy
|
||||
logScope:
|
||||
msgId = shortLog(msgId)
|
||||
|
||||
trace "Created new message", msg = shortLog(msg), peers = peers.len
|
||||
trace "Created new message", payload = shortLog(msg), peers = peers.len
|
||||
|
||||
if g.addSeen(g.salt(msgId)):
|
||||
# If the message was received or published recently, don't re-publish it -
|
||||
@@ -803,7 +806,7 @@ proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.as
|
||||
trace "Direct peer dial canceled"
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "Direct peer error dialing", msg = exc.msg
|
||||
debug "Direct peer error dialing", description = exc.msg
|
||||
|
||||
proc addDirectPeer*(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} =
|
||||
g.parameters.directPeers[id] = addrs
|
||||
|
||||
@@ -135,7 +135,7 @@ proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} =
|
||||
try:
|
||||
await g.switch.disconnect(peer.peerId)
|
||||
except CatchableError as exc: # Never cancelled
|
||||
trace "Failed to close connection", peer, error = exc.name, msg = exc.msg
|
||||
trace "Failed to close connection", peer, errName = exc.name, description = exc.msg
|
||||
|
||||
proc disconnectIfBadScorePeer*(g: GossipSub, peer: PubSubPeer, score: float64) =
|
||||
if g.parameters.disconnectBadPeers and score < g.parameters.graylistThreshold and
|
||||
|
||||
@@ -197,7 +197,7 @@ proc send*(
|
||||
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
|
||||
## priority messages have been sent.
|
||||
|
||||
trace "sending pubsub message to peer", peer, msg = shortLog(msg)
|
||||
trace "sending pubsub message to peer", peer, payload = shortLog(msg)
|
||||
peer.send(msg, p.anonymize, isHighPriority)
|
||||
|
||||
proc broadcast*(
|
||||
@@ -255,7 +255,7 @@ proc broadcast*(
|
||||
else:
|
||||
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = ["generic"])
|
||||
|
||||
trace "broadcasting messages to peers", peers = sendPeers.len, msg = shortLog(msg)
|
||||
trace "broadcasting messages to peers", peers = sendPeers.len, payload = shortLog(msg)
|
||||
|
||||
if anyIt(sendPeers, it.hasObservers):
|
||||
for peer in sendPeers:
|
||||
@@ -403,7 +403,7 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] =
|
||||
for fut in futs:
|
||||
if fut.failed:
|
||||
let err = fut.readError()
|
||||
warn "Error in topic handler", msg = err.msg
|
||||
warn "Error in topic handler", description = err.msg
|
||||
|
||||
return waiter()
|
||||
|
||||
@@ -437,7 +437,7 @@ method handleConn*(p: PubSub, conn: Connection, proto: string) {.base, async.} =
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception ocurred in pubsub handle", exc = exc.msg, conn
|
||||
trace "exception ocurred in pubsub handle", description = exc.msg, conn
|
||||
finally:
|
||||
await conn.closeWithEOF()
|
||||
|
||||
|
||||
@@ -67,6 +67,8 @@ type
|
||||
PubSubObserver* = ref object
|
||||
onRecv*: proc(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].}
|
||||
onSend*: proc(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].}
|
||||
onValidated*:
|
||||
proc(peer: PubSubPeer, msg: Message, msgId: MessageId) {.gcsafe, raises: [].}
|
||||
|
||||
PubSubPeerEventKind* {.pure.} = enum
|
||||
StreamOpened
|
||||
@@ -170,14 +172,23 @@ proc recvObservers*(p: PubSubPeer, msg: var RPCMsg) =
|
||||
if not (isNil(p.observers)) and p.observers[].len > 0:
|
||||
for obs in p.observers[]:
|
||||
if not (isNil(obs)): # TODO: should never be nil, but...
|
||||
obs.onRecv(p, msg)
|
||||
if not (isNil(obs.onRecv)):
|
||||
obs.onRecv(p, msg)
|
||||
|
||||
proc validatedObservers*(p: PubSubPeer, msg: Message, msgId: MessageId) =
|
||||
# trigger hooks
|
||||
if not (isNil(p.observers)) and p.observers[].len > 0:
|
||||
for obs in p.observers[]:
|
||||
if not (isNil(obs.onValidated)):
|
||||
obs.onValidated(p, msg, msgId)
|
||||
|
||||
proc sendObservers(p: PubSubPeer, msg: var RPCMsg) =
|
||||
# trigger hooks
|
||||
if not (isNil(p.observers)) and p.observers[].len > 0:
|
||||
for obs in p.observers[]:
|
||||
if not (isNil(obs)): # TODO: should never be nil, but...
|
||||
obs.onSend(p, msg)
|
||||
if not (isNil(obs.onSend)):
|
||||
obs.onSend(p, msg)
|
||||
|
||||
proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||
debug "starting pubsub read loop", conn, peer = p, closed = conn.closed
|
||||
@@ -194,10 +205,10 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||
data = newSeq[byte]() # Release memory
|
||||
except PeerRateLimitError as exc:
|
||||
debug "Peer rate limit exceeded, exiting read while",
|
||||
conn, peer = p, error = exc.msg
|
||||
conn, peer = p, description = exc.msg
|
||||
except CatchableError as exc:
|
||||
debug "Exception occurred in PubSubPeer.handle",
|
||||
conn, peer = p, closed = conn.closed, exc = exc.msg
|
||||
conn, peer = p, closed = conn.closed, description = exc.msg
|
||||
finally:
|
||||
await conn.close()
|
||||
except CancelledError:
|
||||
@@ -206,7 +217,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||
trace "Unexpected cancellation in PubSubPeer.handle"
|
||||
except CatchableError as exc:
|
||||
trace "Exception occurred in PubSubPeer.handle",
|
||||
conn, peer = p, closed = conn.closed, exc = exc.msg
|
||||
conn, peer = p, closed = conn.closed, description = exc.msg
|
||||
finally:
|
||||
debug "exiting pubsub read loop", conn, peer = p, closed = conn.closed
|
||||
|
||||
@@ -225,7 +236,7 @@ proc closeSendConn(p: PubSubPeer, event: PubSubPeerEventKind) {.async.} =
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "Errors during diconnection events", error = exc.msg
|
||||
debug "Errors during diconnection events", description = exc.msg
|
||||
# don't cleanup p.address else we leak some gossip stat table
|
||||
|
||||
proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
|
||||
@@ -272,7 +283,7 @@ proc connectImpl(p: PubSubPeer) {.async.} =
|
||||
return
|
||||
await connectOnce(p)
|
||||
except CatchableError as exc: # never cancelled
|
||||
debug "Could not establish send connection", msg = exc.msg
|
||||
debug "Could not establish send connection", description = exc.msg
|
||||
|
||||
proc connect*(p: PubSubPeer) =
|
||||
if p.connected:
|
||||
@@ -314,7 +325,7 @@ proc sendMsgContinue(conn: Connection, msgFut: Future[void]) {.async.} =
|
||||
except CatchableError as exc: # never cancelled
|
||||
# Because we detach the send call from the currently executing task using
|
||||
# asyncSpawn, no exceptions may leak out of it
|
||||
trace "Unable to send to remote", conn, msg = exc.msg
|
||||
trace "Unable to send to remote", conn, description = exc.msg
|
||||
# Next time sendConn is used, it will be have its close flag set and thus
|
||||
# will be recycled
|
||||
|
||||
@@ -330,7 +341,7 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
||||
|
||||
var conn = p.sendConn
|
||||
if conn == nil or conn.closed():
|
||||
debug "No send connection", p, msg = shortLog(msg)
|
||||
debug "No send connection", p, payload = shortLog(msg)
|
||||
return
|
||||
|
||||
trace "sending encoded msg to peer", conn, encoded = shortLog(msg)
|
||||
@@ -372,7 +383,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v
|
||||
) == 0
|
||||
|
||||
if msg.len <= 0:
|
||||
debug "empty message, skipping", p, msg = shortLog(msg)
|
||||
debug "empty message, skipping", p, payload = shortLog(msg)
|
||||
Future[void].completed()
|
||||
elif msg.len > p.maxMessageSize:
|
||||
info "trying to send a msg too big for pubsub",
|
||||
|
||||
@@ -310,7 +310,7 @@ proc decodeMessages*(pb: ProtoBuffer): ProtoResult[seq[Message]] {.inline.} =
|
||||
ok(msgs)
|
||||
|
||||
proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] =
|
||||
trace "encodeRpcMsg: encoding message", msg = msg.shortLog()
|
||||
trace "encodeRpcMsg: encoding message", payload = msg.shortLog()
|
||||
var pb = initProtoBuffer(maxSize = uint.high)
|
||||
for item in msg.subscriptions:
|
||||
pb.write(1, item)
|
||||
@@ -329,7 +329,7 @@ proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] =
|
||||
pb.buffer
|
||||
|
||||
proc decodeRpcMsg*(msg: seq[byte]): ProtoResult[RPCMsg] {.inline.} =
|
||||
trace "decodeRpcMsg: decoding message", msg = msg.shortLog()
|
||||
trace "decodeRpcMsg: decoding message", payload = msg.shortLog()
|
||||
var pb = initProtoBuffer(msg, maxSize = uint.high)
|
||||
var rpcMsg = RPCMsg()
|
||||
assign(rpcMsg.messages, ?pb.decodeMessages())
|
||||
|
||||
@@ -35,8 +35,6 @@ const
|
||||
RendezVousCodec* = "/rendezvous/1.0.0"
|
||||
MinimumDuration* = 2.hours
|
||||
MaximumDuration = 72.hours
|
||||
MinimumTTL = MinimumDuration.seconds.uint64
|
||||
MaximumTTL = MaximumDuration.seconds.uint64
|
||||
RegistrationLimitPerPeer = 1000
|
||||
DiscoverLimit = 1000'u64
|
||||
SemaphoreDefaultSize = 5
|
||||
@@ -320,6 +318,10 @@ type
|
||||
peers: seq[PeerId]
|
||||
cookiesSaved: Table[PeerId, Table[string, seq[byte]]]
|
||||
switch: Switch
|
||||
minDuration: Duration
|
||||
maxDuration: Duration
|
||||
minTTL: uint64
|
||||
maxTTL: uint64
|
||||
|
||||
proc checkPeerRecord(spr: seq[byte], peerId: PeerId): Result[void, string] =
|
||||
if spr.len == 0:
|
||||
@@ -395,7 +397,7 @@ proc save(
|
||||
rdv.registered.add(
|
||||
RegisteredData(
|
||||
peerId: peerId,
|
||||
expiration: Moment.now() + r.ttl.get(MinimumTTL).int64.seconds,
|
||||
expiration: Moment.now() + r.ttl.get(rdv.minTTL).int64.seconds,
|
||||
data: r,
|
||||
)
|
||||
)
|
||||
@@ -409,8 +411,8 @@ proc register(rdv: RendezVous, conn: Connection, r: Register): Future[void] =
|
||||
libp2p_rendezvous_register.inc()
|
||||
if r.ns.len notin 1 .. 255:
|
||||
return conn.sendRegisterResponseError(InvalidNamespace)
|
||||
let ttl = r.ttl.get(MinimumTTL)
|
||||
if ttl notin MinimumTTL .. MaximumTTL:
|
||||
let ttl = r.ttl.get(rdv.minTTL)
|
||||
if ttl notin rdv.minTTL .. rdv.maxTTL:
|
||||
return conn.sendRegisterResponseError(InvalidTTL)
|
||||
let pr = checkPeerRecord(r.signedPeerRecord, conn.peerId)
|
||||
if pr.isErr():
|
||||
@@ -499,31 +501,42 @@ proc advertisePeer(rdv: RendezVous, peer: PeerId, msg: seq[byte]) {.async.} =
|
||||
else:
|
||||
trace "Successfully registered", peer, response = msgRecv.registerResponse
|
||||
except CatchableError as exc:
|
||||
trace "exception in the advertise", error = exc.msg
|
||||
trace "exception in the advertise", description = exc.msg
|
||||
finally:
|
||||
rdv.sema.release()
|
||||
|
||||
await rdv.sema.acquire()
|
||||
discard await advertiseWrap().withTimeout(5.seconds)
|
||||
|
||||
method advertise*(
|
||||
rdv: RendezVous, ns: string, ttl: Duration = MinimumDuration
|
||||
) {.async, base.} =
|
||||
let sprBuff = rdv.switch.peerInfo.signedPeerRecord.encode().valueOr:
|
||||
raise newException(RendezVousError, "Wrong Signed Peer Record")
|
||||
proc advertise*(
|
||||
rdv: RendezVous, ns: string, ttl: Duration, peers: seq[PeerId]
|
||||
) {.async.} =
|
||||
if ns.len notin 1 .. 255:
|
||||
raise newException(RendezVousError, "Invalid namespace")
|
||||
if ttl notin MinimumDuration .. MaximumDuration:
|
||||
raise newException(RendezVousError, "Invalid time to live")
|
||||
|
||||
if ttl notin rdv.minDuration .. rdv.maxDuration:
|
||||
raise newException(RendezVousError, "Invalid time to live: " & $ttl)
|
||||
|
||||
let sprBuff = rdv.switch.peerInfo.signedPeerRecord.encode().valueOr:
|
||||
raise newException(RendezVousError, "Wrong Signed Peer Record")
|
||||
|
||||
let
|
||||
r = Register(ns: ns, signedPeerRecord: sprBuff, ttl: Opt.some(ttl.seconds.uint64))
|
||||
msg = encode(Message(msgType: MessageType.Register, register: Opt.some(r)))
|
||||
|
||||
rdv.save(ns, rdv.switch.peerInfo.peerId, r)
|
||||
let fut = collect(newSeq()):
|
||||
for peer in rdv.peers:
|
||||
|
||||
let futs = collect(newSeq()):
|
||||
for peer in peers:
|
||||
trace "Send Advertise", peerId = peer, ns
|
||||
rdv.advertisePeer(peer, msg.buffer)
|
||||
await allFutures(fut)
|
||||
|
||||
await allFutures(futs)
|
||||
|
||||
method advertise*(
|
||||
rdv: RendezVous, ns: string, ttl: Duration = rdv.minDuration
|
||||
) {.async, base.} =
|
||||
await rdv.advertise(ns, ttl, rdv.peers)
|
||||
|
||||
proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] =
|
||||
let
|
||||
@@ -540,9 +553,8 @@ proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] =
|
||||
@[]
|
||||
|
||||
proc request*(
|
||||
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int
|
||||
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int, peers: seq[PeerId]
|
||||
): Future[seq[PeerRecord]] {.async.} =
|
||||
let nsSalted = ns & rdv.salt
|
||||
var
|
||||
s: Table[PeerId, (PeerRecord, Register)]
|
||||
limit: uint64
|
||||
@@ -587,8 +599,8 @@ proc request*(
|
||||
for r in resp.registrations:
|
||||
if limit == 0:
|
||||
return
|
||||
let ttl = r.ttl.get(MaximumTTL + 1)
|
||||
if ttl > MaximumTTL:
|
||||
let ttl = r.ttl.get(rdv.maxTTL + 1)
|
||||
if ttl > rdv.maxTTL:
|
||||
continue
|
||||
let
|
||||
spr = SignedPeerRecord.decode(r.signedPeerRecord).valueOr:
|
||||
@@ -596,7 +608,7 @@ proc request*(
|
||||
pr = spr.data
|
||||
if s.hasKey(pr.peerId):
|
||||
let (prSaved, rSaved) = s[pr.peerId]
|
||||
if (prSaved.seqNo == pr.seqNo and rSaved.ttl.get(MaximumTTL) < ttl) or
|
||||
if (prSaved.seqNo == pr.seqNo and rSaved.ttl.get(rdv.maxTTL) < ttl) or
|
||||
prSaved.seqNo < pr.seqNo:
|
||||
s[pr.peerId] = (pr, r)
|
||||
else:
|
||||
@@ -605,8 +617,6 @@ proc request*(
|
||||
for (_, r) in s.values():
|
||||
rdv.save(ns, peer, r, false)
|
||||
|
||||
# copy to avoid resizes during the loop
|
||||
let peers = rdv.peers
|
||||
for peer in peers:
|
||||
if limit == 0:
|
||||
break
|
||||
@@ -618,9 +628,14 @@ proc request*(
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception catch in request", error = exc.msg
|
||||
trace "exception catch in request", description = exc.msg
|
||||
return toSeq(s.values()).mapIt(it[0])
|
||||
|
||||
proc request*(
|
||||
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int
|
||||
): Future[seq[PeerRecord]] {.async.} =
|
||||
await rdv.request(ns, l, rdv.peers)
|
||||
|
||||
proc unsubscribeLocally*(rdv: RendezVous, ns: string) =
|
||||
let nsSalted = ns & rdv.salt
|
||||
try:
|
||||
@@ -630,26 +645,33 @@ proc unsubscribeLocally*(rdv: RendezVous, ns: string) =
|
||||
except KeyError:
|
||||
return
|
||||
|
||||
proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} =
|
||||
# TODO: find a way to improve this, maybe something similar to the advertise
|
||||
proc unsubscribe*(rdv: RendezVous, ns: string, peerIds: seq[PeerId]) {.async.} =
|
||||
if ns.len notin 1 .. 255:
|
||||
raise newException(RendezVousError, "Invalid namespace")
|
||||
rdv.unsubscribeLocally(ns)
|
||||
|
||||
let msg = encode(
|
||||
Message(msgType: MessageType.Unregister, unregister: Opt.some(Unregister(ns: ns)))
|
||||
)
|
||||
|
||||
proc unsubscribePeer(rdv: RendezVous, peerId: PeerId) {.async.} =
|
||||
proc unsubscribePeer(peerId: PeerId) {.async.} =
|
||||
try:
|
||||
let conn = await rdv.switch.dial(peerId, RendezVousCodec)
|
||||
defer:
|
||||
await conn.close()
|
||||
await conn.writeLp(msg.buffer)
|
||||
except CatchableError as exc:
|
||||
trace "exception while unsubscribing", error = exc.msg
|
||||
trace "exception while unsubscribing", description = exc.msg
|
||||
|
||||
for peer in rdv.peers:
|
||||
discard await rdv.unsubscribePeer(peer).withTimeout(5.seconds)
|
||||
let futs = collect(newSeq()):
|
||||
for peer in peerIds:
|
||||
unsubscribePeer(peer)
|
||||
|
||||
discard await allFutures(futs).withTimeout(5.seconds)
|
||||
|
||||
proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} =
|
||||
rdv.unsubscribeLocally(ns)
|
||||
|
||||
await rdv.unsubscribe(ns, rdv.peers)
|
||||
|
||||
proc setup*(rdv: RendezVous, switch: Switch) =
|
||||
rdv.switch = switch
|
||||
@@ -662,7 +684,25 @@ proc setup*(rdv: RendezVous, switch: Switch) =
|
||||
rdv.switch.addPeerEventHandler(handlePeer, Joined)
|
||||
rdv.switch.addPeerEventHandler(handlePeer, Left)
|
||||
|
||||
proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T =
|
||||
proc new*(
|
||||
T: typedesc[RendezVous],
|
||||
rng: ref HmacDrbgContext = newRng(),
|
||||
minDuration = MinimumDuration,
|
||||
maxDuration = MaximumDuration,
|
||||
): T {.raises: [RendezVousError].} =
|
||||
if minDuration < 1.minutes:
|
||||
raise newException(RendezVousError, "TTL too short: 1 minute minimum")
|
||||
|
||||
if maxDuration > 72.hours:
|
||||
raise newException(RendezVousError, "TTL too long: 72 hours maximum")
|
||||
|
||||
if minDuration >= maxDuration:
|
||||
raise newException(RendezVousError, "Minimum TTL longer than maximum")
|
||||
|
||||
let
|
||||
minTTL = minDuration.seconds.uint64
|
||||
maxTTL = maxDuration.seconds.uint64
|
||||
|
||||
let rdv = T(
|
||||
rng: rng,
|
||||
salt: string.fromBytes(generateBytes(rng[], 8)),
|
||||
@@ -670,6 +710,10 @@ proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T =
|
||||
defaultDT: Moment.now() - 1.days,
|
||||
#registerEvent: newAsyncEvent(),
|
||||
sema: newAsyncSemaphore(SemaphoreDefaultSize),
|
||||
minDuration: minDuration,
|
||||
maxDuration: maxDuration,
|
||||
minTTL: minTTL,
|
||||
maxTTL: maxTTL,
|
||||
)
|
||||
logScope:
|
||||
topics = "libp2p discovery rendezvous"
|
||||
@@ -692,7 +736,7 @@ proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T =
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception in rendezvous handler", error = exc.msg
|
||||
trace "exception in rendezvous handler", description = exc.msg
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
@@ -701,9 +745,13 @@ proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T =
|
||||
return rdv
|
||||
|
||||
proc new*(
|
||||
T: typedesc[RendezVous], switch: Switch, rng: ref HmacDrbgContext = newRng()
|
||||
): T =
|
||||
let rdv = T.new(rng)
|
||||
T: typedesc[RendezVous],
|
||||
switch: Switch,
|
||||
rng: ref HmacDrbgContext = newRng(),
|
||||
minDuration = MinimumDuration,
|
||||
maxDuration = MaximumDuration,
|
||||
): T {.raises: [RendezVousError].} =
|
||||
let rdv = T.new(rng, minDuration, maxDuration)
|
||||
rdv.setup(switch)
|
||||
return rdv
|
||||
|
||||
|
||||
@@ -155,7 +155,7 @@ method init*(s: Secure) =
|
||||
await conn.close()
|
||||
raise exc
|
||||
except LPStreamError as exc:
|
||||
warn "securing connection failed", err = exc.msg, conn
|
||||
warn "securing connection failed", description = exc.msg, conn
|
||||
await conn.close()
|
||||
|
||||
s.handler = handle
|
||||
|
||||
@@ -97,52 +97,49 @@ proc getWildcardMultiAddresses(
|
||||
|
||||
proc getWildcardAddress(
|
||||
maddress: MultiAddress,
|
||||
multiCodec: MultiCodec,
|
||||
anyAddr: openArray[uint8],
|
||||
addrFamily: AddressFamily,
|
||||
port: Port,
|
||||
networkInterfaceProvider: NetworkInterfaceProvider,
|
||||
): seq[MultiAddress] =
|
||||
var addresses: seq[MultiAddress]
|
||||
maddress.getProtocolArgument(multiCodec).withValue(address):
|
||||
if address == anyAddr:
|
||||
let filteredInterfaceAddresses = networkInterfaceProvider(addrFamily)
|
||||
addresses.add(
|
||||
getWildcardMultiAddresses(filteredInterfaceAddresses, IPPROTO_TCP, port)
|
||||
)
|
||||
else:
|
||||
addresses.add(maddress)
|
||||
return addresses
|
||||
let filteredInterfaceAddresses = networkInterfaceProvider(addrFamily)
|
||||
getWildcardMultiAddresses(filteredInterfaceAddresses, IPPROTO_TCP, port)
|
||||
|
||||
proc expandWildcardAddresses(
|
||||
networkInterfaceProvider: NetworkInterfaceProvider, listenAddrs: seq[MultiAddress]
|
||||
): seq[MultiAddress] =
|
||||
var addresses: seq[MultiAddress]
|
||||
|
||||
# In this loop we expand bound addresses like `0.0.0.0` and `::` to list of interface addresses.
|
||||
for listenAddr in listenAddrs:
|
||||
if TCP_IP.matchPartial(listenAddr):
|
||||
listenAddr.getProtocolArgument(multiCodec("tcp")).withValue(portArg):
|
||||
let port = Port(uint16.fromBytesBE(portArg))
|
||||
if IP4.matchPartial(listenAddr):
|
||||
let wildcardAddresses = getWildcardAddress(
|
||||
listenAddr,
|
||||
multiCodec("ip4"),
|
||||
AnyAddress.address_v4,
|
||||
AddressFamily.IPv4,
|
||||
port,
|
||||
networkInterfaceProvider,
|
||||
)
|
||||
addresses.add(wildcardAddresses)
|
||||
listenAddr.getProtocolArgument(multiCodec("ip4")).withValue(ip4):
|
||||
if ip4 == AnyAddress.address_v4:
|
||||
addresses.add(
|
||||
getWildcardAddress(
|
||||
listenAddr, AddressFamily.IPv4, port, networkInterfaceProvider
|
||||
)
|
||||
)
|
||||
else:
|
||||
addresses.add(listenAddr)
|
||||
elif IP6.matchPartial(listenAddr):
|
||||
let wildcardAddresses = getWildcardAddress(
|
||||
listenAddr,
|
||||
multiCodec("ip6"),
|
||||
AnyAddress6.address_v6,
|
||||
AddressFamily.IPv6,
|
||||
port,
|
||||
networkInterfaceProvider,
|
||||
)
|
||||
addresses.add(wildcardAddresses)
|
||||
listenAddr.getProtocolArgument(multiCodec("ip6")).withValue(ip6):
|
||||
if ip6 == AnyAddress6.address_v6:
|
||||
addresses.add(
|
||||
getWildcardAddress(
|
||||
listenAddr, AddressFamily.IPv6, port, networkInterfaceProvider
|
||||
)
|
||||
)
|
||||
# IPv6 dual stack
|
||||
addresses.add(
|
||||
getWildcardAddress(
|
||||
listenAddr, AddressFamily.IPv4, port, networkInterfaceProvider
|
||||
)
|
||||
)
|
||||
else:
|
||||
addresses.add(listenAddr)
|
||||
else:
|
||||
addresses.add(listenAddr)
|
||||
else:
|
||||
|
||||
@@ -326,4 +326,4 @@ proc closeWithEOF*(s: LPStream): Future[void] {.async: (raises: []), public.} =
|
||||
except LPStreamEOFError:
|
||||
trace "Expected EOF came", s
|
||||
except LPStreamError as exc:
|
||||
debug "Unexpected error while waiting for EOF", s, msg = exc.msg
|
||||
debug "Unexpected error while waiting for EOF", s, description = exc.msg
|
||||
|
||||
@@ -219,7 +219,7 @@ proc upgradeMonitor(
|
||||
libp2p_failed_upgrades_incoming.inc()
|
||||
if not isNil(conn):
|
||||
await conn.close()
|
||||
trace "Exception awaiting connection upgrade", exc = exc.msg, conn
|
||||
trace "Exception awaiting connection upgrade", description = exc.msg, conn
|
||||
finally:
|
||||
upgrades.release()
|
||||
|
||||
@@ -264,7 +264,7 @@ proc accept(s: Switch, transport: Transport) {.async.} = # noraises
|
||||
upgrades.release() # always release the slot
|
||||
return
|
||||
except CatchableError as exc:
|
||||
error "Exception in accept loop, exiting", exc = exc.msg
|
||||
error "Exception in accept loop, exiting", description = exc.msg
|
||||
upgrades.release() # always release the slot
|
||||
if not isNil(conn):
|
||||
await conn.close()
|
||||
@@ -282,7 +282,7 @@ proc stop*(s: Switch) {.async, public.} =
|
||||
# Stop accepting incoming connections
|
||||
await allFutures(s.acceptFuts.mapIt(it.cancelAndWait())).wait(1.seconds)
|
||||
except CatchableError as exc:
|
||||
debug "Cannot cancel accepts", error = exc.msg
|
||||
debug "Cannot cancel accepts", description = exc.msg
|
||||
|
||||
for service in s.services:
|
||||
discard await service.stop(s)
|
||||
@@ -296,7 +296,7 @@ proc stop*(s: Switch) {.async, public.} =
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
warn "error cleaning up transports", msg = exc.msg
|
||||
warn "error cleaning up transports", description = exc.msg
|
||||
|
||||
await s.ms.stop()
|
||||
|
||||
|
||||
224
libp2p/transports/quictransport.nim
Normal file
224
libp2p/transports/quictransport.nim
Normal file
@@ -0,0 +1,224 @@
|
||||
import std/sequtils
|
||||
import pkg/chronos
|
||||
import pkg/chronicles
|
||||
import pkg/quic
|
||||
import ../multiaddress
|
||||
import ../multicodec
|
||||
import ../stream/connection
|
||||
import ../wire
|
||||
import ../muxers/muxer
|
||||
import ../upgrademngrs/upgrade
|
||||
import ./transport
|
||||
|
||||
export multiaddress
|
||||
export multicodec
|
||||
export connection
|
||||
export transport
|
||||
|
||||
logScope:
|
||||
topics = "libp2p quictransport"
|
||||
|
||||
type
|
||||
P2PConnection = connection.Connection
|
||||
QuicConnection = quic.Connection
|
||||
|
||||
# Stream
|
||||
type QuicStream* = ref object of P2PConnection
|
||||
stream: Stream
|
||||
cached: seq[byte]
|
||||
|
||||
proc new(
|
||||
_: type QuicStream, stream: Stream, oaddr: Opt[MultiAddress], peerId: PeerId
|
||||
): QuicStream =
|
||||
let quicstream = QuicStream(stream: stream, observedAddr: oaddr, peerId: peerId)
|
||||
procCall P2PConnection(quicstream).initStream()
|
||||
quicstream
|
||||
|
||||
template mapExceptions(body: untyped) =
|
||||
try:
|
||||
body
|
||||
except QuicError:
|
||||
raise newLPStreamEOFError()
|
||||
except CatchableError:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
method readOnce*(
|
||||
stream: QuicStream, pbytes: pointer, nbytes: int
|
||||
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
try:
|
||||
if stream.cached.len == 0:
|
||||
stream.cached = await stream.stream.read()
|
||||
result = min(nbytes, stream.cached.len)
|
||||
copyMem(pbytes, addr stream.cached[0], result)
|
||||
stream.cached = stream.cached[result ..^ 1]
|
||||
except CatchableError as exc:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
{.push warning[LockLevel]: off.}
|
||||
method write*(
|
||||
stream: QuicStream, bytes: seq[byte]
|
||||
) {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
mapExceptions(await stream.stream.write(bytes))
|
||||
|
||||
{.pop.}
|
||||
|
||||
method closeImpl*(stream: QuicStream) {.async: (raises: []).} =
|
||||
try:
|
||||
await stream.stream.close()
|
||||
except CatchableError as exc:
|
||||
discard
|
||||
await procCall P2PConnection(stream).closeImpl()
|
||||
|
||||
# Session
|
||||
type QuicSession* = ref object of P2PConnection
|
||||
connection: QuicConnection
|
||||
|
||||
method close*(session: QuicSession) {.async, base.} =
|
||||
await session.connection.close()
|
||||
await procCall P2PConnection(session).close()
|
||||
|
||||
proc getStream*(
|
||||
session: QuicSession, direction = Direction.In
|
||||
): Future[QuicStream] {.async.} =
|
||||
var stream: Stream
|
||||
case direction
|
||||
of Direction.In:
|
||||
stream = await session.connection.incomingStream()
|
||||
of Direction.Out:
|
||||
stream = await session.connection.openStream()
|
||||
await stream.write(@[]) # QUIC streams do not exist until data is sent
|
||||
return QuicStream.new(stream, session.observedAddr, session.peerId)
|
||||
|
||||
method getWrapped*(self: QuicSession): P2PConnection =
|
||||
nil
|
||||
|
||||
# Muxer
|
||||
type QuicMuxer = ref object of Muxer
|
||||
quicSession: QuicSession
|
||||
handleFut: Future[void]
|
||||
|
||||
method newStream*(
|
||||
m: QuicMuxer, name: string = "", lazy: bool = false
|
||||
): Future[P2PConnection] {.
|
||||
async: (raises: [CancelledError, LPStreamError, MuxerError])
|
||||
.} =
|
||||
try:
|
||||
return await m.quicSession.getStream(Direction.Out)
|
||||
except CatchableError as exc:
|
||||
raise newException(MuxerError, exc.msg, exc)
|
||||
|
||||
proc handleStream(m: QuicMuxer, chann: QuicStream) {.async.} =
|
||||
## call the muxer stream handler for this channel
|
||||
##
|
||||
try:
|
||||
await m.streamHandler(chann)
|
||||
trace "finished handling stream"
|
||||
doAssert(chann.closed, "connection not closed by handler!")
|
||||
except CatchableError as exc:
|
||||
trace "Exception in mplex stream handler", msg = exc.msg
|
||||
await chann.close()
|
||||
|
||||
method handle*(m: QuicMuxer): Future[void] {.async: (raises: []).} =
|
||||
try:
|
||||
while not m.quicSession.atEof:
|
||||
let incomingStream = await m.quicSession.getStream(Direction.In)
|
||||
asyncSpawn m.handleStream(incomingStream)
|
||||
except CatchableError as exc:
|
||||
trace "Exception in mplex handler", msg = exc.msg
|
||||
|
||||
method close*(m: QuicMuxer) {.async: (raises: []).} =
|
||||
try:
|
||||
await m.quicSession.close()
|
||||
m.handleFut.cancel()
|
||||
except CatchableError as exc:
|
||||
discard
|
||||
|
||||
# Transport
|
||||
type QuicUpgrade = ref object of Upgrade
|
||||
|
||||
type QuicTransport* = ref object of Transport
|
||||
listener: Listener
|
||||
connections: seq[P2PConnection]
|
||||
|
||||
func new*(_: type QuicTransport, u: Upgrade): QuicTransport =
|
||||
QuicTransport(upgrader: QuicUpgrade(ms: u.ms))
|
||||
|
||||
method handles*(transport: QuicTransport, address: MultiAddress): bool =
|
||||
if not procCall Transport(transport).handles(address):
|
||||
return false
|
||||
QUIC_V1.match(address)
|
||||
|
||||
method start*(transport: QuicTransport, addrs: seq[MultiAddress]) {.async.} =
|
||||
doAssert transport.listener.isNil, "start() already called"
|
||||
#TODO handle multiple addr
|
||||
transport.listener = listen(initTAddress(addrs[0]).tryGet)
|
||||
await procCall Transport(transport).start(addrs)
|
||||
transport.addrs[0] =
|
||||
MultiAddress.init(transport.listener.localAddress(), IPPROTO_UDP).tryGet() &
|
||||
MultiAddress.init("/quic-v1").get()
|
||||
transport.running = true
|
||||
|
||||
method stop*(transport: QuicTransport) {.async.} =
|
||||
if transport.running:
|
||||
for c in transport.connections:
|
||||
await c.close()
|
||||
await procCall Transport(transport).stop()
|
||||
await transport.listener.stop()
|
||||
transport.running = false
|
||||
transport.listener = nil
|
||||
|
||||
proc wrapConnection(
|
||||
transport: QuicTransport, connection: QuicConnection
|
||||
): P2PConnection {.raises: [Defect, TransportOsError, LPError].} =
|
||||
let
|
||||
remoteAddr = connection.remoteAddress()
|
||||
observedAddr =
|
||||
MultiAddress.init(remoteAddr, IPPROTO_UDP).get() &
|
||||
MultiAddress.init("/quic-v1").get()
|
||||
conres = QuicSession(connection: connection, observedAddr: Opt.some(observedAddr))
|
||||
conres.initStream()
|
||||
|
||||
transport.connections.add(conres)
|
||||
proc onClose() {.async.} =
|
||||
await conres.join()
|
||||
transport.connections.keepItIf(it != conres)
|
||||
trace "Cleaned up client"
|
||||
|
||||
asyncSpawn onClose()
|
||||
return conres
|
||||
|
||||
method accept*(transport: QuicTransport): Future[P2PConnection] {.async.} =
|
||||
doAssert not transport.listener.isNil, "call start() before calling accept()"
|
||||
let connection = await transport.listener.accept()
|
||||
return transport.wrapConnection(connection)
|
||||
|
||||
method dial*(
|
||||
transport: QuicTransport,
|
||||
hostname: string,
|
||||
address: MultiAddress,
|
||||
peerId: Opt[PeerId] = Opt.none(PeerId),
|
||||
): Future[P2PConnection] {.async, gcsafe.} =
|
||||
let connection = await dial(initTAddress(address).tryGet)
|
||||
return transport.wrapConnection(connection)
|
||||
|
||||
method upgrade*(
|
||||
self: QuicTransport, conn: P2PConnection, peerId: Opt[PeerId]
|
||||
): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} =
|
||||
let qs = QuicSession(conn)
|
||||
if peerId.isSome:
|
||||
qs.peerId = peerId.get()
|
||||
|
||||
let muxer = QuicMuxer(quicSession: qs, connection: conn)
|
||||
muxer.streamHandler = proc(conn: P2PConnection) {.async: (raises: []).} =
|
||||
trace "Starting stream handler"
|
||||
try:
|
||||
await self.upgrader.ms.handle(conn) # handle incoming connection
|
||||
except CancelledError as exc:
|
||||
return
|
||||
except CatchableError as exc:
|
||||
trace "exception in stream handler", conn, msg = exc.msg
|
||||
finally:
|
||||
await conn.closeWithEOF()
|
||||
trace "Stream handler done", conn
|
||||
muxer.handleFut = muxer.handle()
|
||||
return muxer
|
||||
@@ -226,51 +226,65 @@ method accept*(self: TcpTransport): Future[Connection] =
|
||||
proc impl(
|
||||
self: TcpTransport
|
||||
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
proc cancelAcceptFuts() =
|
||||
for fut in self.acceptFuts:
|
||||
if not fut.completed():
|
||||
fut.cancel()
|
||||
|
||||
if not self.running:
|
||||
raise newTransportClosedError()
|
||||
|
||||
if self.acceptFuts.len <= 0:
|
||||
if self.servers.len == 0:
|
||||
raise (ref TcpTransportError)(msg: "No listeners configured")
|
||||
elif self.acceptFuts.len == 0:
|
||||
# Holds futures representing ongoing accept calls on multiple servers.
|
||||
self.acceptFuts = self.servers.mapIt(it.accept())
|
||||
|
||||
let
|
||||
finished =
|
||||
try:
|
||||
# Waits for any one of these futures to complete, indicating that a new connection has been accepted on one of the servers.
|
||||
await one(self.acceptFuts)
|
||||
except ValueError:
|
||||
raise (ref TcpTransportError)(msg: "No listeners configured")
|
||||
|
||||
index = self.acceptFuts.find(finished)
|
||||
transp =
|
||||
try:
|
||||
await finished
|
||||
except TransportTooManyError as exc:
|
||||
debug "Too many files opened", exc = exc.msg
|
||||
return nil
|
||||
except TransportAbortedError as exc:
|
||||
debug "Connection aborted", exc = exc.msg
|
||||
return nil
|
||||
except TransportUseClosedError as exc:
|
||||
raise newTransportClosedError(exc)
|
||||
except TransportOsError as exc:
|
||||
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
|
||||
except common.TransportError as exc: # Needed for chronos 4.0.0 support
|
||||
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
|
||||
raiseAssert "Accept futures should not be empty"
|
||||
except CancelledError as exc:
|
||||
cancelAcceptFuts()
|
||||
raise exc
|
||||
index = self.acceptFuts.find(finished)
|
||||
|
||||
# A new connection has been accepted. The corresponding server should immediately start accepting another connection.
|
||||
# Thus we replace the completed future with a new one by calling accept on the same server again.
|
||||
self.acceptFuts[index] = self.servers[index].accept()
|
||||
let transp =
|
||||
try:
|
||||
await finished
|
||||
except TransportTooManyError as exc:
|
||||
debug "Too many files opened", description = exc.msg
|
||||
return nil
|
||||
except TransportAbortedError as exc:
|
||||
debug "Connection aborted", description = exc.msg
|
||||
return nil
|
||||
except TransportUseClosedError as exc:
|
||||
raise newTransportClosedError(exc)
|
||||
except TransportOsError as exc:
|
||||
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
|
||||
except common.TransportError as exc: # Needed for chronos 4.0.0 support
|
||||
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
|
||||
except CancelledError as exc:
|
||||
cancelAcceptFuts()
|
||||
raise exc
|
||||
|
||||
if not self.running: # Stopped while waiting
|
||||
await transp.closeWait()
|
||||
raise newTransportClosedError()
|
||||
|
||||
self.acceptFuts[index] = self.servers[index].accept()
|
||||
|
||||
let remote =
|
||||
try:
|
||||
transp.remoteAddress
|
||||
except TransportOsError as exc:
|
||||
# The connection had errors / was closed before `await` returned control
|
||||
await transp.closeWait()
|
||||
debug "Cannot read remote address", exc = exc.msg
|
||||
debug "Cannot read remote address", description = exc.msg
|
||||
return nil
|
||||
|
||||
let observedAddr =
|
||||
|
||||
@@ -56,7 +56,7 @@ proc new*(
|
||||
stream.initStream()
|
||||
return stream
|
||||
|
||||
template mapExceptions(body: untyped) =
|
||||
template mapExceptions(body: untyped): untyped =
|
||||
try:
|
||||
body
|
||||
except AsyncStreamIncompleteError:
|
||||
@@ -206,7 +206,7 @@ method stop*(self: WsTransport) {.async.} =
|
||||
self.httpservers = @[]
|
||||
trace "Transport stopped"
|
||||
except CatchableError as exc:
|
||||
trace "Error shutting down ws transport", exc = exc.msg
|
||||
trace "Error shutting down ws transport", description = exc.msg
|
||||
|
||||
proc connHandler(
|
||||
self: WsTransport, stream: WSSession, secure: bool, dir: Direction
|
||||
@@ -223,7 +223,7 @@ proc connHandler(
|
||||
|
||||
MultiAddress.init(remoteAddr).tryGet() & codec.tryGet()
|
||||
except CatchableError as exc:
|
||||
trace "Failed to create observedAddr", exc = exc.msg
|
||||
trace "Failed to create observedAddr", description = exc.msg
|
||||
if not (isNil(stream) and stream.stream.reader.closed):
|
||||
await stream.close()
|
||||
raise exc
|
||||
@@ -271,26 +271,26 @@ method accept*(self: WsTransport): Future[Connection] {.async.} =
|
||||
await req.stream.closeWait()
|
||||
raise exc
|
||||
except WebSocketError as exc:
|
||||
debug "Websocket Error", exc = exc.msg
|
||||
debug "Websocket Error", description = exc.msg
|
||||
except HttpError as exc:
|
||||
debug "Http Error", exc = exc.msg
|
||||
debug "Http Error", description = exc.msg
|
||||
except AsyncStreamError as exc:
|
||||
debug "AsyncStream Error", exc = exc.msg
|
||||
debug "AsyncStream Error", description = exc.msg
|
||||
except TransportTooManyError as exc:
|
||||
debug "Too many files opened", exc = exc.msg
|
||||
debug "Too many files opened", description = exc.msg
|
||||
except TransportAbortedError as exc:
|
||||
debug "Connection aborted", exc = exc.msg
|
||||
debug "Connection aborted", description = exc.msg
|
||||
except AsyncTimeoutError as exc:
|
||||
debug "Timed out", exc = exc.msg
|
||||
debug "Timed out", description = exc.msg
|
||||
except TransportUseClosedError as exc:
|
||||
debug "Server was closed", exc = exc.msg
|
||||
debug "Server was closed", description = exc.msg
|
||||
raise newTransportClosedError(exc)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except TransportOsError as exc:
|
||||
debug "OS Error", exc = exc.msg
|
||||
debug "OS Error", description = exc.msg
|
||||
except CatchableError as exc:
|
||||
info "Unexpected error accepting connection", exc = exc.msg
|
||||
info "Unexpected error accepting connection", description = exc.msg
|
||||
raise exc
|
||||
|
||||
method dial*(
|
||||
|
||||
@@ -59,7 +59,8 @@ when defined(libp2p_agents_metrics):
|
||||
KnownLibP2PAgents* {.strdefine.} = "nim-libp2p"
|
||||
KnownLibP2PAgentsSeq* = KnownLibP2PAgents.safeToLowerAscii().tryGet().split(",")
|
||||
|
||||
template safeConvert*[T: SomeInteger, S: Ordinal](value: S): T =
|
||||
proc safeConvert*[T: SomeInteger](value: SomeOrdinal): T =
|
||||
type S = typeof(value)
|
||||
## Converts `value` from S to `T` iff `value` is guaranteed to be preserved.
|
||||
when int64(T.low) <= int64(S.low()) and uint64(T.high) >= uint64(S.high):
|
||||
T(value)
|
||||
|
||||
@@ -20,7 +20,7 @@ when defined(windows): import winlean else: import posix
|
||||
const
|
||||
RTRANSPMA* = mapOr(TCP, WebSockets, UNIX)
|
||||
|
||||
TRANSPMA* = mapOr(RTRANSPMA, UDP)
|
||||
TRANSPMA* = mapOr(RTRANSPMA, QUIC, UDP)
|
||||
|
||||
proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] =
|
||||
## Initialize ``TransportAddress`` with MultiAddress ``ma``.
|
||||
@@ -28,7 +28,7 @@ proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] =
|
||||
## MultiAddress must be wire address, e.g. ``{IP4, IP6, UNIX}/{TCP, UDP}``.
|
||||
##
|
||||
|
||||
if mapOr(TCP_IP, WebSockets_IP, UNIX, UDP_IP).match(ma):
|
||||
if mapOr(TCP_IP, WebSockets_IP, UNIX, UDP_IP, QUIC_V1_IP).match(ma):
|
||||
var pbuf: array[2, byte]
|
||||
let code = (?(?ma[0]).protoCode())
|
||||
if code == multiCodec("unix"):
|
||||
|
||||
@@ -144,7 +144,7 @@ template commonTransportTest*(prov: TransportProvider, ma1: string, ma2: string
|
||||
let transport1 = transpProvider()
|
||||
await transport1.start(addrs)
|
||||
|
||||
proc acceptHandler() {.async.} =
|
||||
proc acceptHandler() {.async, gensym.} =
|
||||
while true:
|
||||
let conn = await transport1.accept()
|
||||
await conn.write(newSeq[byte](0))
|
||||
@@ -208,7 +208,7 @@ template commonTransportTest*(prov: TransportProvider, ma1: string, ma2: string
|
||||
let transport1 = transpProvider()
|
||||
await transport1.start(ma)
|
||||
|
||||
proc acceptHandler() {.async.} =
|
||||
proc acceptHandler() {.async, gensym.} =
|
||||
let conn = await transport1.accept()
|
||||
await conn.close()
|
||||
|
||||
|
||||
@@ -17,7 +17,6 @@ import strutils, os
|
||||
libp2p_mplex_metrics
|
||||
--d:
|
||||
unittestPrintTime
|
||||
--skipParentCfg
|
||||
|
||||
# Only add chronicles param if the
|
||||
# user didn't specify any
|
||||
|
||||
@@ -120,7 +120,7 @@ proc main() {.async.} =
|
||||
echo &"""{{"rtt_to_holepunched_peer_millis":{delay.millis}}}"""
|
||||
quit(0)
|
||||
except CatchableError as e:
|
||||
error "Unexpected error", msg = e.msg
|
||||
error "Unexpected error", description = e.msg
|
||||
|
||||
discard waitFor(main().withTimeout(4.minutes))
|
||||
quit(1)
|
||||
|
||||
@@ -677,55 +677,76 @@ suite "GossipSub internal":
|
||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.subscribe(topic, handler2)
|
||||
|
||||
# Instantiates 30 peers and connects all of them to the previously defined `gossipSub`
|
||||
for i in 0 ..< 30:
|
||||
# Define a new connection
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.handler = handler
|
||||
# Add the connection to `gossipSub`, to their `gossipSub.gossipsub` and `gossipSub.mesh` tables
|
||||
gossipSub.grafted(peer, topic)
|
||||
gossipSub.mesh[topic].incl(peer)
|
||||
|
||||
# Peers with no budget should not request messages
|
||||
block:
|
||||
# should ignore no budget peer
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
let id = @[0'u8, 1, 2, 3]
|
||||
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
|
||||
peer.iHaveBudget = 0
|
||||
let iwants = gossipSub.handleIHave(peer, @[msg])
|
||||
check:
|
||||
iwants.messageIDs.len == 0
|
||||
|
||||
block:
|
||||
# given duplicate ihave should generate only one iwant
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
let id = @[0'u8, 1, 2, 3]
|
||||
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
|
||||
let iwants = gossipSub.handleIHave(peer, @[msg])
|
||||
check:
|
||||
iwants.messageIDs.len == 1
|
||||
|
||||
block:
|
||||
# given duplicate iwant should generate only one message
|
||||
# Define a new connection
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
# Add message to `gossipSub`'s message cache
|
||||
let id = @[0'u8, 1, 2, 3]
|
||||
gossipSub.mcache.put(id, Message())
|
||||
peer.sentIHaves[^1].incl(id)
|
||||
# Build an IHAVE message that contains the same message ID three times
|
||||
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
|
||||
# Given the peer has no budget to request messages
|
||||
peer.iHaveBudget = 0
|
||||
# When a peer makes an IHAVE request for the a message that `gossipSub` has
|
||||
let iwants = gossipSub.handleIHave(peer, @[msg])
|
||||
# Then `gossipSub` should not generate an IWant message for the message,
|
||||
check:
|
||||
iwants.messageIDs.len == 0
|
||||
|
||||
# Peers with budget should request messages. If ids are repeated, only one request should be generated
|
||||
block:
|
||||
# Define a new connection
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
let id = @[0'u8, 1, 2, 3]
|
||||
# Build an IHAVE message that contains the same message ID three times
|
||||
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
|
||||
# Given the budget is not 0 (because it's not been overridden)
|
||||
# When a peer makes an IHAVE request for the a message that `gossipSub` does not have
|
||||
let iwants = gossipSub.handleIHave(peer, @[msg])
|
||||
# Then `gossipSub` should generate an IWant message for the message
|
||||
check:
|
||||
iwants.messageIDs.len == 1
|
||||
|
||||
# Peers with budget should request messages. If ids are repeated, only one request should be generated
|
||||
block:
|
||||
# Define a new connection
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
# Add message to `gossipSub`'s message cache
|
||||
let id = @[0'u8, 1, 2, 3]
|
||||
gossipSub.mcache.put(id, Message())
|
||||
peer.sentIHaves[^1].incl(id)
|
||||
# Build an IWANT message that contains the same message ID three times
|
||||
let msg = ControlIWant(messageIDs: @[id, id, id])
|
||||
# When a peer makes an IWANT request for the a message that `gossipSub` has
|
||||
let genmsg = gossipSub.handleIWant(peer, @[msg])
|
||||
# Then `gossipSub` should return the message
|
||||
check:
|
||||
genmsg.len == 1
|
||||
|
||||
@@ -773,7 +794,7 @@ suite "GossipSub internal":
|
||||
var sentMessages = initHashSet[seq[byte]]()
|
||||
|
||||
for i, size in enumerate([size1, size2]):
|
||||
let data = newSeqWith[byte](size, i.byte)
|
||||
let data = newSeqWith(size, i.byte)
|
||||
sentMessages.incl(data)
|
||||
|
||||
let msg =
|
||||
|
||||
@@ -220,6 +220,63 @@ suite "GossipSub":
|
||||
|
||||
await allFuturesThrowing(nodesFut.concat())
|
||||
|
||||
asyncTest "GossipSub's observers should run after message is sent, received and validated":
|
||||
var
|
||||
recvCounter = 0
|
||||
sendCounter = 0
|
||||
validatedCounter = 0
|
||||
|
||||
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||
discard
|
||||
|
||||
proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
|
||||
inc recvCounter
|
||||
|
||||
proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
|
||||
inc sendCounter
|
||||
|
||||
proc onValidated(peer: PubSubPeer, msg: Message, msgId: MessageId) =
|
||||
inc validatedCounter
|
||||
|
||||
let obs0 = PubSubObserver(onSend: onSend)
|
||||
let obs1 = PubSubObserver(onRecv: onRecv, onValidated: onValidated)
|
||||
|
||||
let nodes = generateNodes(2, gossip = true)
|
||||
# start switches
|
||||
discard await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
|
||||
|
||||
await subscribeNodes(nodes)
|
||||
|
||||
nodes[0].addObserver(obs0)
|
||||
nodes[1].addObserver(obs1)
|
||||
nodes[1].subscribe("foo", handler)
|
||||
nodes[1].subscribe("bar", handler)
|
||||
|
||||
proc validator(
|
||||
topic: string, message: Message
|
||||
): Future[ValidationResult] {.async.} =
|
||||
result = if topic == "foo": ValidationResult.Accept else: ValidationResult.Reject
|
||||
|
||||
nodes[1].addValidator("foo", "bar", validator)
|
||||
|
||||
# Send message that will be accepted by the receiver's validator
|
||||
tryPublish await nodes[0].publish("foo", "Hello!".toBytes()), 1
|
||||
|
||||
check:
|
||||
recvCounter == 1
|
||||
validatedCounter == 1
|
||||
sendCounter == 1
|
||||
|
||||
# Send message that will be rejected by the receiver's validator
|
||||
tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1
|
||||
|
||||
check:
|
||||
recvCounter == 2
|
||||
validatedCounter == 1
|
||||
sendCounter == 2
|
||||
|
||||
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
|
||||
|
||||
asyncTest "GossipSub unsub - resub faster than backoff":
|
||||
var handlerFut = newFuture[bool]()
|
||||
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||
@@ -1002,7 +1059,7 @@ suite "GossipSub":
|
||||
|
||||
# Simulate sending an undecodable message
|
||||
await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(
|
||||
newSeqWith[byte](33, 1.byte), isHighPriority = true
|
||||
newSeqWith(33, 1.byte), isHighPriority = true
|
||||
)
|
||||
await sleepAsync(300.millis)
|
||||
|
||||
@@ -1012,7 +1069,7 @@ suite "GossipSub":
|
||||
# Disconnect peer when rate limiting is enabled
|
||||
gossip1.parameters.disconnectPeerAboveRateLimit = true
|
||||
await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(
|
||||
newSeqWith[byte](35, 1.byte), isHighPriority = true
|
||||
newSeqWith(35, 1.byte), isHighPriority = true
|
||||
)
|
||||
|
||||
checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
|
||||
|
||||
@@ -26,10 +26,11 @@ const
|
||||
SuccessVectors = [
|
||||
"/ip4/1.2.3.4", "/ip4/0.0.0.0", "/ip6/::1",
|
||||
"/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21",
|
||||
"/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21/udp/1234/quic", "/ip6zone/x/ip6/fe80::1",
|
||||
"/ip6zone/x%y/ip6/fe80::1", "/ip6zone/x%y/ip6/::",
|
||||
"/ip6zone/x/ip6/fe80::1/udp/1234/quic", "/onion/timaq4ygg2iegci7:1234",
|
||||
"/onion/timaq4ygg2iegci7:80/http",
|
||||
"/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21/udp/1234/quic",
|
||||
"/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21/udp/1234/quic-v1",
|
||||
"/ip6zone/x/ip6/fe80::1", "/ip6zone/x%y/ip6/fe80::1", "/ip6zone/x%y/ip6/::",
|
||||
"/ip6zone/x/ip6/fe80::1/udp/1234/quic", "/ip6zone/x/ip6/fe80::1/udp/1234/quic-v1",
|
||||
"/onion/timaq4ygg2iegci7:1234", "/onion/timaq4ygg2iegci7:80/http",
|
||||
"/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:1234",
|
||||
"/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:80/http",
|
||||
"/udp/0", "/tcp/0", "/sctp/0", "/udp/1234", "/tcp/1234", "/sctp/1234", "/udp/65535",
|
||||
@@ -57,7 +58,7 @@ const
|
||||
FailureVectors = [
|
||||
"", "/", "/ip4", "/ip4/::1", "/ip4/fdpsofodsajfdoisa", "/ip6", "/ip6zone",
|
||||
"/ip6zone/", "/ip6zone//ip6/fe80::1", "/udp", "/tcp", "/sctp", "/udp/65536",
|
||||
"/tcp/65536", "/quic/65536", "/onion/9imaq4ygg2iegci7:80",
|
||||
"/tcp/65536", "/quic/65536", "/quic-v1/65536", "/onion/9imaq4ygg2iegci7:80",
|
||||
"/onion/aaimaq4ygg2iegci7:80", "/onion/timaq4ygg2iegci7:0",
|
||||
"/onion/timaq4ygg2iegci7:-1", "/onion/timaq4ygg2iegci7",
|
||||
"/onion/timaq4ygg2iegci@:666",
|
||||
@@ -70,8 +71,8 @@ const
|
||||
"/udp/1234/sctp", "/udp/1234/udt/1234", "/udp/1234/utp/1234",
|
||||
"/ip4/127.0.0.1/udp/jfodsajfidosajfoidsa", "/ip4/127.0.0.1/udp",
|
||||
"/ip4/127.0.0.1/tcp/jfodsajfidosajfoidsa", "/ip4/127.0.0.1/tcp",
|
||||
"/ip4/127.0.0.1/quic/1234", "/ip4/127.0.0.1/ipfs", "/ip4/127.0.0.1/ipfs/tcp",
|
||||
"/ip4/127.0.0.1/p2p", "/ip4/127.0.0.1/p2p/tcp", "/unix",
|
||||
"/ip4/127.0.0.1/quic/1234", "/ip4/127.0.0.1/quic-v1/1234", "/ip4/127.0.0.1/ipfs",
|
||||
"/ip4/127.0.0.1/ipfs/tcp", "/ip4/127.0.0.1/p2p", "/ip4/127.0.0.1/p2p/tcp", "/unix",
|
||||
]
|
||||
|
||||
RustSuccessVectors = [
|
||||
@@ -160,6 +161,15 @@ const
|
||||
"/quic",
|
||||
],
|
||||
),
|
||||
PatternVector(
|
||||
pattern: QUIC_V1,
|
||||
good: @["/ip4/1.2.3.4/udp/1234/quic-v1", "/ip6/::/udp/1234/quic-v1"],
|
||||
bad:
|
||||
@[
|
||||
"/ip4/0.0.0.0/tcp/12345/quic-v1", "/ip6/fc00::/ip4/0.0.0.0/udp/1234/quic-v1",
|
||||
"/quic-v1",
|
||||
],
|
||||
),
|
||||
PatternVector(
|
||||
pattern: IPFS,
|
||||
good:
|
||||
|
||||
@@ -24,4 +24,5 @@ import
|
||||
testbufferstream, testidentify, testobservedaddrmanager, testconnmngr, testswitch,
|
||||
testnoise, testpeerinfo, testpeerstore, testping, testmplex, testrelayv1, testrelayv2,
|
||||
testrendezvous, testdiscovery, testyamux, testautonat, testautonatservice,
|
||||
testautorelay, testdcutr, testhpservice, testutility, testhelpers
|
||||
testautorelay, testdcutr, testhpservice, testutility, testhelpers,
|
||||
testwildcardresolverservice
|
||||
|
||||
24
tests/testquic.nim
Normal file
24
tests/testquic.nim
Normal file
@@ -0,0 +1,24 @@
|
||||
{.used.}
|
||||
|
||||
import sequtils
|
||||
import chronos, stew/byteutils
|
||||
import
|
||||
../libp2p/[
|
||||
stream/connection,
|
||||
transports/transport,
|
||||
transports/quictransport,
|
||||
upgrademngrs/upgrade,
|
||||
multiaddress,
|
||||
errors,
|
||||
wire,
|
||||
]
|
||||
|
||||
import ./helpers, ./commontransport
|
||||
|
||||
suite "Quic transport":
|
||||
asyncTest "can handle local address":
|
||||
let ma = @[MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet()]
|
||||
let transport1 = QuicTransport.new()
|
||||
await transport1.start(ma)
|
||||
check transport1.handles(transport1.addrs[0])
|
||||
await transport1.stop()
|
||||
@@ -126,7 +126,7 @@ suite "RendezVous":
|
||||
|
||||
asyncTest "Various local error":
|
||||
let
|
||||
rdv = RendezVous.new()
|
||||
rdv = RendezVous.new(minDuration = 1.minutes, maxDuration = 72.hours)
|
||||
switch = createSwitch(rdv)
|
||||
expect RendezVousError:
|
||||
discard await rdv.request("A".repeat(300))
|
||||
@@ -137,6 +137,14 @@ suite "RendezVous":
|
||||
expect RendezVousError:
|
||||
await rdv.advertise("A".repeat(300))
|
||||
expect RendezVousError:
|
||||
await rdv.advertise("A", 2.weeks)
|
||||
await rdv.advertise("A", 73.hours)
|
||||
expect RendezVousError:
|
||||
await rdv.advertise("A", 5.minutes)
|
||||
await rdv.advertise("A", 30.seconds)
|
||||
|
||||
test "Various config error":
|
||||
expect RendezVousError:
|
||||
discard RendezVous.new(minDuration = 30.seconds)
|
||||
expect RendezVousError:
|
||||
discard RendezVous.new(maxDuration = 73.hours)
|
||||
expect RendezVousError:
|
||||
discard RendezVous.new(minDuration = 15.minutes, maxDuration = 10.minutes)
|
||||
|
||||
@@ -34,6 +34,7 @@ import
|
||||
utils/semaphore,
|
||||
transports/tcptransport,
|
||||
transports/wstransport,
|
||||
transports/quictransport,
|
||||
]
|
||||
import ./helpers
|
||||
|
||||
@@ -988,6 +989,42 @@ suite "Switch":
|
||||
await srcWsSwitch.stop()
|
||||
await srcTcpSwitch.stop()
|
||||
|
||||
asyncTest "e2e quic transport":
|
||||
let
|
||||
quicAddress1 = MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet()
|
||||
quicAddress2 = MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet()
|
||||
|
||||
srcSwitch = SwitchBuilder
|
||||
.new()
|
||||
.withAddress(quicAddress1)
|
||||
.withRng(crypto.newRng())
|
||||
.withTransport(
|
||||
proc(upgr: Upgrade): Transport =
|
||||
QuicTransport.new(upgr)
|
||||
)
|
||||
.withNoise()
|
||||
.build()
|
||||
|
||||
destSwitch = SwitchBuilder
|
||||
.new()
|
||||
.withAddress(quicAddress2)
|
||||
.withRng(crypto.newRng())
|
||||
.withTransport(
|
||||
proc(upgr: Upgrade): Transport =
|
||||
QuicTransport.new(upgr)
|
||||
)
|
||||
.withNoise()
|
||||
.build()
|
||||
|
||||
await destSwitch.start()
|
||||
await srcSwitch.start()
|
||||
|
||||
await srcSwitch.connect(destSwitch.peerInfo.peerId, destSwitch.peerInfo.addrs)
|
||||
check srcSwitch.isConnected(destSwitch.peerInfo.peerId)
|
||||
|
||||
await destSwitch.stop()
|
||||
await srcSwitch.stop()
|
||||
|
||||
asyncTest "mount unstarted protocol":
|
||||
proc handle(conn: Connection, proto: string) {.async.} =
|
||||
check "test123" == string.fromBytes(await conn.readLp(1024))
|
||||
|
||||
@@ -17,46 +17,42 @@ import ../libp2p/utility
|
||||
suite "Utility":
|
||||
|
||||
test "successful safeConvert from int8 to int16":
|
||||
let res = safeConvert[int16, int8]((-128).int8)
|
||||
let res = safeConvert[int16]((-128).int8)
|
||||
check res == -128'i16
|
||||
|
||||
test "unsuccessful safeConvert from int16 to int8":
|
||||
check not (compiles do:
|
||||
result: int8 = safeConvert[int8, int16](32767'i16))
|
||||
result: int8 = safeConvert[int8](32767'i16))
|
||||
|
||||
test "successful safeConvert from uint8 to uint16":
|
||||
let res: uint16 = safeConvert[uint16, uint8](255'u8)
|
||||
let res: uint16 = safeConvert[uint16](255'u8)
|
||||
check res == 255'u16
|
||||
|
||||
test "unsuccessful safeConvert from uint16 to uint8":
|
||||
check not (compiles do:
|
||||
result: uint8 = safeConvert[uint8, uint16](256'u16))
|
||||
|
||||
test "successful safeConvert from char to int":
|
||||
let res: int = safeConvert[int, char]('A')
|
||||
check res == 65
|
||||
result: uint8 = safeConvert[uint8](256'u16))
|
||||
|
||||
test "unsuccessful safeConvert from int to char":
|
||||
check not (compiles do:
|
||||
result: char = safeConvert[char, int](128))
|
||||
result: char = safeConvert[char](128))
|
||||
|
||||
test "successful safeConvert from bool to int":
|
||||
let res: int = safeConvert[int, bool](true)
|
||||
let res: int = safeConvert[int](true)
|
||||
check res == 1
|
||||
|
||||
test "unsuccessful safeConvert from int to bool":
|
||||
check not (compiles do:
|
||||
result: bool = safeConvert[bool, int](2))
|
||||
result: bool = safeConvert[bool](2))
|
||||
|
||||
test "successful safeConvert from enum to int":
|
||||
type Color = enum red, green, blue
|
||||
let res: int = safeConvert[int, Color](green)
|
||||
let res: int = safeConvert[int](green)
|
||||
check res == 1
|
||||
|
||||
test "unsuccessful safeConvert from int to enum":
|
||||
type Color = enum red, green, blue
|
||||
check not (compiles do:
|
||||
result: Color = safeConvert[Color, int](3))
|
||||
result: Color = safeConvert[Color](3))
|
||||
|
||||
test "successful safeConvert from range to int":
|
||||
let res: int = safeConvert[int, range[1..10]](5)
|
||||
@@ -68,11 +64,11 @@ suite "Utility":
|
||||
|
||||
test "unsuccessful safeConvert from int to uint":
|
||||
check not (compiles do:
|
||||
result: uint = safeConvert[uint, int](11))
|
||||
result: uint = safeConvert[uint](11))
|
||||
|
||||
test "unsuccessful safeConvert from uint to int":
|
||||
check not (compiles do:
|
||||
result: uint = safeConvert[int, uint](11.uint))
|
||||
result: uint = safeConvert[int](11.uint))
|
||||
|
||||
suite "withValue and valueOr templates":
|
||||
type
|
||||
|
||||
@@ -38,18 +38,11 @@ proc getAddressesMock(
|
||||
echo "Error: " & $e.msg
|
||||
fail()
|
||||
|
||||
proc createSwitch(svc: Service): Switch =
|
||||
proc createSwitch(svc: Service, addrs: seq[MultiAddress]): Switch =
|
||||
SwitchBuilder
|
||||
.new()
|
||||
.withRng(newRng())
|
||||
.withAddresses(
|
||||
@[
|
||||
MultiAddress.init("/ip4/127.0.0.1/tcp/0/").tryGet(),
|
||||
MultiAddress.init("/ip4/0.0.0.0/tcp/0/").tryGet(),
|
||||
MultiAddress.init("/ip6/::/tcp/0/").tryGet(),
|
||||
],
|
||||
false,
|
||||
)
|
||||
.withAddresses(addrs, false)
|
||||
.withTcpTransport()
|
||||
.withMplex()
|
||||
.withNoise()
|
||||
@@ -63,19 +56,19 @@ suite "WildcardAddressResolverService":
|
||||
asyncTest "WildcardAddressResolverService must resolve wildcard addresses and stop doing so when stopped":
|
||||
let svc: Service =
|
||||
WildcardAddressResolverService.new(networkInterfaceProvider = getAddressesMock)
|
||||
let switch = createSwitch(svc)
|
||||
let switch = createSwitch(
|
||||
svc,
|
||||
@[
|
||||
MultiAddress.init("/ip4/127.0.0.1/tcp/0/").tryGet(),
|
||||
MultiAddress.init("/ip4/0.0.0.0/tcp/0/").tryGet(),
|
||||
MultiAddress.init("/ip6/::/tcp/0/").tryGet(),
|
||||
],
|
||||
)
|
||||
await switch.start()
|
||||
let tcpIp4Locahost = switch.peerInfo.addrs[0][multiCodec("tcp")].get
|
||||
let tcpIp4Wildcard = switch.peerInfo.addrs[1][multiCodec("tcp")].get
|
||||
let tcpIp6 = switch.peerInfo.addrs[2][multiCodec("tcp")].get # tcp port for ip6
|
||||
let tcpIp6 = switch.peerInfo.addrs[3][multiCodec("tcp")].get # tcp port for ip6
|
||||
|
||||
check switch.peerInfo.addrs ==
|
||||
@[
|
||||
MultiAddress.init("/ip4/127.0.0.1" & $tcpIp4Locahost).get,
|
||||
MultiAddress.init("/ip4/0.0.0.0" & $tcpIp4Wildcard).get,
|
||||
MultiAddress.init("/ip6/::" & $tcpIp6).get,
|
||||
]
|
||||
await svc.run(switch)
|
||||
check switch.peerInfo.addrs ==
|
||||
@[
|
||||
MultiAddress.init("/ip4/127.0.0.1" & $tcpIp4Locahost).get,
|
||||
@@ -83,6 +76,9 @@ suite "WildcardAddressResolverService":
|
||||
MultiAddress.init("/ip4/192.168.1.22" & $tcpIp4Wildcard).get,
|
||||
MultiAddress.init("/ip6/::1" & $tcpIp6).get,
|
||||
MultiAddress.init("/ip6/fe80::1" & $tcpIp6).get,
|
||||
# IPv6 dual stack
|
||||
MultiAddress.init("/ip4/127.0.0.1" & $tcpIp6).get,
|
||||
MultiAddress.init("/ip4/192.168.1.22" & $tcpIp6).get,
|
||||
]
|
||||
await switch.stop()
|
||||
check switch.peerInfo.addrs ==
|
||||
|
||||
Reference in New Issue
Block a user