Compare commits

...

29 Commits

Author SHA1 Message Date
diegomrsantos
1fa30f07e8 chore(ci): add arm64 for macOS (#1212)
This PR adds the macOS 14 GitHub runner that uses the arm64 cpu.
2024-12-20 21:18:56 -04:00
richΛrd
39d0451a10 chore: validate PR titles and commits, and autoassign PRs (#1227) 2024-12-20 15:42:54 +01:00
richΛrd
4dc7a89f45 chore: use latest nimpng and nico (#1229) 2024-12-17 14:37:06 -04:00
richΛrd
fd26f93b80 fix(ci): use nim 2.0 branch (#1228) 2024-12-09 19:41:08 +00:00
Etan Kissling
dd2c74d413 feat(nameresolving): Add {.async: (raises).} annotations (#1214)
Modernize `nameresolving` modules to track `{.async: (raises).}`.

---------

Co-authored-by: kaiserd <1684595+kaiserd@users.noreply.github.com>
Co-authored-by: richΛrd <info@richardramos.me>
2024-12-09 12:43:21 +01:00
Eugene Kabanov
b7e0df127f Fix PeerStore missing remote endpoints of established connection. (#1226) 2024-11-27 14:49:41 +02:00
kaiserd
f591e692fc chore(version): update libp2p.nimble to 1.7.1 (#1223)
minor version for rendezvous fix
2024-11-09 10:51:33 +07:00
NagyZoltanPeter
8855bce085 fix:missing raises pragma for one of RendezVous.new (#1222) 2024-11-08 11:11:51 +01:00
kaiserd
ed5670408b chore(version): update libp2p.nimble to 1.7.0 (#1213)
This commit is planned to be tagged with v1.7.0.
The only feature added in this version are configurable min and max TTL
for rendezvous.

Co-authored-by: ksr <kaiserd@users.noreply.github.com>
2024-11-03 18:15:33 +00:00
Álex
97192a3c80 fix(ci): nim 2.0 dependency failure (#1216)
Broken due to an update to Nimble 0.16.2 for the version-2-0 branch.
This PR sets the ref to the previous commit. Also, updates the key
naming so it fits better.

Nim's commit list: https://github.com/nim-lang/Nim/commits/version-2-0/

# Important Note
In this PR some required job's names were changed. They need to be
updated at repo-level so this PR can be merged.
2024-10-31 17:56:13 +00:00
Álex
294d06323c docs(test): handle IHAVE / IWANT tests (#1202)
Add documentation as requested.
2024-10-31 17:23:24 +00:00
Álex
a3b8729cbe fix(ci): Daily workflows report (#1200)
Fix daily workflows' green tick when jobs failed.

Based of off: https://stackoverflow.com/a/58859404

Closes:  https://github.com/vacp2p/nim-libp2p/issues/1197

---------

Co-authored-by: Diego <diego@status.im>
2024-10-10 12:10:49 +00:00
Álex
6c970911f2 fix(CI): free disk space on interop transport job (#1206)
Readd the free disk space job, as space issues still happen when
building images.

---------

Co-authored-by: Diego <diego@status.im>
2024-10-10 09:42:25 +00:00
Álex
5d48776b02 chore(ci): Enable S3 caching for interop (#1193)
- Adds our S3 bucket for caching docker images as Protocol Labs shut
down their shared one.
- Remove the free disk space workaround that prevented the jobs from
failing for using too much space for the images.

---------

Co-authored-by: diegomrsantos <diego@status.im>
2024-09-26 09:56:09 +00:00
Simon-Pierre Vivier
d389d96789 feat: rendezvous refactor (#1183)
Hello!

This PR aim to refactor rendezvous code so that it is easier to impl.
Waku rdv strategy. The hardcoded min and max TTL were out of range with
what we needed and specifying which peers to interact with is also
needed since Waku deals with peers on multiple separate shards.

I tried to keep the changes to a minimum, specifically I did not change
the name of any public procs which result in less than descriptive names
in some cases. I also wanted to return results instead of raising
exceptions but didn't. Would it be acceptable to do so?

Please advise on best practices, thank you.

---------

Co-authored-by: Ludovic Chenut <ludovic@status.im>
2024-09-25 09:11:57 +00:00
kaiserd
09fe199b6b chore(version): update libp2p.nimble to 1.6.0 (#1196)
Updating libp2p.nimble to 1.6.0.
This commit is planned to be tagged with 1.6.0.

The only new feature in this version is the highly experimental Quick
transport. Still, it is a feature and justifies a minor version upgrade.
It also contains a few fixes and improvements (see commit history).

Co-authored-by: ksr <kaiserd@users.noreply.github.com>
2024-09-19 20:06:45 +00:00
diegomrsantos
68306cf1f1 chore: fix devel compilation issues (#1195)
- fixes https://github.com/vacp2p/nim-libp2p/issues/1194.
- fixes ambiguous `KeyError`
- removes an unnecessary type param for `newSeqWith`
- fixes tests for `safeConvert`

The main fixes relate to Nim 2.2 being more strict and not accepting
calls with a wrong number of type parameters.
2024-09-19 11:35:50 +00:00
Tanguy
b37133ca43 feat(transport): add experimental QUIC Transport (not production ready) (#725)
Our quic effort is blocked by bearssl not supporting TLS1.3, but since
Mark did most of the work to implement Quic here:
https://github.com/status-im/nim-libp2p/pull/563 and on nim-quic, this
PR is going to bring encryption-less Quic into nim-libp2p
This allows us to test it, and make sure it doesn't bitrot.

Heavily WiP:
- [X] Extract code from #563
- [X] Create custom muxer & upgrader
- [X] Basic E2E switch test working
- [x] Update nim-quic to get address informations in libp2p (for
`observed address` and port 0 resolving)
- [ ] More tests
- [ ] Cleanup

Since encryption is not yet supported, we're not compatible with any
other libp2ps, and have to rely on home made protocols to retrieve the
peer's id

---------

Co-authored-by: markspanbroek <mark@spanbroek.net>
Co-authored-by: Diego <diego@status.im>
2024-09-12 09:32:14 +00:00
diegomrsantos
3e3df07269 chore: add support to merge queues (#1192)
This change is necessary before enabling merge queues. The `merge_group`
event is needed to trigger the GitHub Actions workflow when a pull
request is added to a merge queue.

The merge queue provides the same benefits as the Require branches to be
up to date before merging branch protection but does not require a pull
request author to update their pull request branch and wait for status
checks to finish before trying to merge. More info on
https://docs.github.com/en/repositories/configuring-branches-and-merges-in-your-repository/configuring-pull-request-merges/managing-a-merge-queue.
2024-09-10 16:10:24 +00:00
Etan Kissling
1771534030 fix(multiaddress): Raise MaError instead of LPError on & (#1145)
`LPError` is the top level error type of libp2p, it makes more sense to
raise a multi address specific subtype to avoid requiring callers to
catch errors too broadly. As `MaError` inherits from `LPError`, existing
error handlers will still work fine.

---------

Co-authored-by: diegomrsantos <diego@status.im>
2024-09-10 13:24:58 +00:00
diegomrsantos
21a444197c chore: move mm:refc config and remove skipParentCfg (#1190)
- Moving the `mm:refc` configuration to the main `config.nims` file
makes it be used in tests and tasks that build examples.
- It's not clear why `skipParentCfg`was being used and it doesn't seem
to be necessary.
2024-09-10 13:51:54 +02:00
Ivan FB
966996542e chore(connmanager): also show peerID and dir when too many conns (#1185)
Bring a bit more detail when the "Too many connections for peer" message
is being logged. Particularly, we are adding the offending `peerId` and
the stream direction

Co-authored-by: diegomrsantos <diego@status.im>
2024-09-09 13:52:01 +00:00
diegomrsantos
8070b21825 fix(transport): tcp accept fixes (#1170)
Address the comments in https://github.com/vacp2p/nim-libp2p/pull/1164
2024-09-09 11:49:33 +00:00
Cofson
d98152f266 Create funding.json (#1188) 2024-09-05 18:25:50 +02:00
diegomrsantos
47a51983b5 chore(CI): rollback nph change (#1187)
Rollbacks to the previous `nph` command that shows what files aren't
formatted.
2024-09-03 09:01:35 +00:00
Álex
70754cd575 ci: Enable conditional SAT solving (#1177)
* Add conditional SAT dependency solving to a new daily job.

closes: https://github.com/vacp2p/nim-libp2p/issues/1174
2024-09-02 15:33:16 +02:00
tersec
a1811e7395 fix(transport): libp2p compilation in Nim version-2-0 and devel (#1186)
For example:
```
/Users/runner/work/nim-libp2p/nim-libp2p/nimbledeps/pkgs2/websock-0.1.0-94f836ae589056b2deb04bdfdcd614fff80adaf5/websock/http/client.nim(173, 5) template/generic instantiation of `async` from here
/Users/runner/work/nim-libp2p/nim-libp2p/nimbledeps/pkgs2/websock-0.1.0-94f836ae589056b2deb04bdfdcd614fff80adaf5/websock/http/client.nim(165, 1) Warning: The raises pragma doesn't work on async procedures - use `async: (raises: [...]) instead. [User]
/Users/runner/work/nim-libp2p/nim-libp2p/nimbledeps/pkgs2/websock-0.1.0-94f836ae589056b2deb04bdfdcd614fff80adaf5/websock/websock.nim(257, 5) template/generic instantiation of `async` from here
/Users/runner/work/nim-libp2p/nim-libp2p/nimbledeps/pkgs2/websock-0.1.0-94f836ae589056b2deb04bdfdcd614fff80adaf5/websock/websock.nim(251, 1) Warning: The raises pragma doesn't work on async procedures - use `async: (raises: [...]) instead. [User]
/Users/runner/work/nim-libp2p/nim-libp2p/libp2p/transports/wstransport.nim(77, 18) template/generic instantiation of `async` from here
/Users/runner/work/nim-libp2p/nim-libp2p/libp2p/transports/wstransport.nim(83, 10) template/generic instantiation of `setResult` from here
/Users/runner/work/nim-libp2p/nim-libp2p/libp2p/transports/wstransport.nim(78, 26) template/generic instantiation of `mapExceptions` from here
/Users/runner/work/nim-libp2p/nim-libp2p/nimbledeps/pkgs2/chronos-4.0.2-c5e9517b9189713210e2abab8b77a68da71ded12/chronos/internal/asyncmacro.nim(542, 60) Error: expression 'value(cast[type(recv(s.session, pbytes, nbytes))](chronosInternalRetFuture.internalChild))' is of type 'int' and has to be used (or discarded); start of expression here: /Users/runner/work/nim-libp2p/nim-libp2p/libp2p/transports/wstransport.nim(78, 26)
stack trace: (most recent call last)
```
from
https://github.com/vacp2p/nim-libp2p/actions/runs/10655841970/job/29533846606?pr=1145

For minimal example of this:
```nim
template g(body: untyped) =
  try:
    body
  except CatchableError:
    raise newException(CatchableError, "")

discard g(0)
```

Also, even in 2.0.8, a variation doesn't work:
```
template g(body: untyped) = body
discard g(0)
```
2024-09-01 21:49:41 +00:00
Álex
c6e8fadbda fix(ci): Daily workflow parent's name (#1182)
* Fix daily workflows' parent's name.
2024-08-23 15:57:08 +02:00
Álex
48846d69cb chore(logs): remove duplicate msg key (#1180)
* Remove `msg` parameter key in logs.

closes: https://github.com/vacp2p/nim-libp2p/issues/1176
2024-08-14 17:19:54 +02:00
70 changed files with 1079 additions and 377 deletions

View File

@@ -6,7 +6,7 @@ inputs:
cpu:
description: "CPU to build for"
default: "amd64"
nim_branch:
nim_ref:
description: "Nim version"
default: "version-1-6"
shell:
@@ -88,6 +88,8 @@ runs:
run: |
if [[ '${{ inputs.cpu }}' == 'amd64' ]]; then
PLATFORM=x64
elif [[ '${{ inputs.cpu }}' == 'arm64' ]]; then
PLATFORM=arm64
else
PLATFORM=x86
fi
@@ -117,7 +119,7 @@ runs:
uses: actions/cache@v4
with:
path: '${{ github.workspace }}/nim'
key: ${{ inputs.os }}-${{ inputs.cpu }}-nim-${{ inputs.nim_branch }}-cache-${{ env.cache_nonce }}
key: ${{ inputs.os }}-${{ inputs.cpu }}-nim-${{ inputs.nim_ref }}-cache-${{ env.cache_nonce }}
- name: Build Nim and Nimble
shell: ${{ inputs.shell }}
@@ -126,6 +128,6 @@ runs:
# We don't want partial matches of the cache restored
rm -rf nim
curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh
env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} NIM_COMMIT=${{ inputs.nim_branch }} \
env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} NIM_COMMIT=${{ inputs.nim_ref }} \
QUICK_AND_DIRTY_COMPILER=1 QUICK_AND_DIRTY_NIMBLE=1 CC=gcc \
bash build_nim.sh nim csources dist/nimble NimBinaries

12
.github/scripts/colors.sh vendored Normal file
View File

@@ -0,0 +1,12 @@
#!/usr/bin/env bash
# Colors
export YLW='\033[1;33m'
export RED='\033[0;31m'
export GRN='\033[0;32m'
export BLU='\033[0;34m'
export BLD='\033[1m'
export RST='\033[0m'
# Clear line
export CLR='\033[2K'

4
.github/scripts/commit_check.sh vendored Executable file
View File

@@ -0,0 +1,4 @@
#!/usr/bin/env bash
source .github/scripts/parse_commits.sh
parse_commits "$@"

30
.github/scripts/parse_commits.sh vendored Normal file
View File

@@ -0,0 +1,30 @@
#!/usr/bin/env bash
# The output of this script is as follows:
# 1. One line "checking commits between: <start_commit> <end_commit>"
# 2. One line for each commit message that is not well-formed
set -euo pipefail
source .github/scripts/colors.sh
parse_commits() {
BASE_BRANCH=${BASE_BRANCH:-master}
start_commit=${1:-origin/${BASE_BRANCH}}
end_commit=${2:-HEAD}
exit_code=0
echo -e "${GRN}Checking commits between:${RST} $start_commit $end_commit"
# Run the loop in the current shell using process substitution
while IFS= read -r message || [ -n "$message" ]; do
# Check if commit message follows conventional commits format
if [[ ! $message =~ ^(build|chore|ci|docs|feat|fix|perf|refactor|revert|style|test)(\(.*\))?:.*$ ]]; then
echo -e "${YLW}Commit message is ill-formed:${RST} $message"
exit_code=1
fi
done < <(git log --format=%s "$start_commit".."$end_commit")
exit ${exit_code}
}

12
.github/workflows/auto_assign_pr.yml vendored Normal file
View File

@@ -0,0 +1,12 @@
name: Auto Assign PR to Creator
on:
pull_request:
types:
- opened
jobs:
assign_creator:
runs-on: ubuntu-latest
steps:
- uses: toshimaru/auto-author-assign@v1.6.2

View File

@@ -5,6 +5,7 @@ on:
branches:
- master
pull_request:
merge_group:
workflow_dispatch:
concurrency:
@@ -26,12 +27,14 @@ jobs:
cpu: amd64
- os: macos
cpu: amd64
- os: macos-14
cpu: arm64
- os: windows
cpu: amd64
nim:
- branch: version-1-6
nim:
- ref: version-1-6
memory_management: refc
- branch: version-2-0
- ref: version-2-0
memory_management: refc
include:
- platform:
@@ -46,6 +49,10 @@ jobs:
os: macos
builder: macos-13
shell: bash
- platform:
os: macos-14
builder: macos-14
shell: bash
- platform:
os: windows
builder: windows-2022
@@ -55,7 +62,7 @@ jobs:
run:
shell: ${{ matrix.shell }}
name: '${{ matrix.platform.os }}-${{ matrix.platform.cpu }} (Nim ${{ matrix.nim.branch }})'
name: '${{ matrix.platform.os }}-${{ matrix.platform.cpu }} (Nim ${{ matrix.nim.ref }})'
runs-on: ${{ matrix.builder }}
steps:
- name: Checkout
@@ -69,12 +76,12 @@ jobs:
os: ${{ matrix.platform.os }}
cpu: ${{ matrix.platform.cpu }}
shell: ${{ matrix.shell }}
nim_branch: ${{ matrix.nim.branch }}
nim_ref: ${{ matrix.nim.ref }}
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: '~1.15.5'
go-version: '~1.16.0' # That's the minimum Go version that works with arm.
- name: Install p2pd
run: |
@@ -85,9 +92,9 @@ jobs:
uses: actions/cache@v3
with:
path: nimbledeps
# Using nim.branch as a simple way to differentiate between nimble using the "pkgs" or "pkgs2" directories.
# The change happened on Nimble v0.14.0.
key: nimbledeps-${{ matrix.nim.branch }}-${{ hashFiles('.pinned') }} # hashFiles returns a different value on windows
# Using nim.ref as a simple way to differentiate between nimble using the "pkgs" or "pkgs2" directories.
# The change happened on Nimble v0.14.0. Also forcing the deps to be reinstalled on each os and cpu.
key: nimbledeps-${{ matrix.nim.ref }}-${{ matrix.builder }}-${{ matrix.platform.cpu }}-${{ hashFiles('.pinned') }} # hashFiles returns a different value on windows
- name: Install deps
if: ${{ steps.deps-cache.outputs.cache-hit != 'true' }}
@@ -108,5 +115,6 @@ jobs:
nim --version
nimble --version
gcc --version
NIMFLAGS="${NIMFLAGS} --mm:${{ matrix.nim.memory_management }}"
nimble test

77
.github/workflows/commit_lint.yml vendored Normal file
View File

@@ -0,0 +1,77 @@
name: "Conventional Commits"
on:
pull_request:
types:
- opened
- edited
- reopened
- synchronize
jobs:
main:
name: Validate commit messages
runs-on: ubuntu-latest
permissions:
pull-requests: write
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
ref: ${{github.event.pull_request.head.ref}}
repository: ${{github.event.pull_request.head.repo.full_name}}
- name: Check commit message
id: check_commit_message
if: always()
run: |
set +e
base_sha=${{ github.event.pull_request.base.sha }}
head_sha=${{ github.event.pull_request.head.sha }}
output=$(.github/scripts/commit_check.sh "${base_sha}" "${head_sha}" 2>&1)
exit_code=$?
echo "${output}" | sed '$d'
echo "exit_code=${exit_code}" >> $GITHUB_OUTPUT
invalid_commit_messages=$(echo "${output}" | sed '1d;$d')
invalid_commit_messages=$(echo "${output}" | sed '1d;$d')
invalid_commit_messages=$(echo "${invalid_commit_messages}" | sed 's/\x1b\[[0-9;]*m//g') # Remove color codes
invalid_commit_messages=$(echo "${invalid_commit_messages}" | sed 's/^Commit message is ill-formed: //') # Remove prefix
if [[ $exit_code -ne 0 ]]; then
EOF=$(dd if=/dev/urandom bs=15 count=1 status=none | base64)
echo "error_message<<$EOF" >> "$GITHUB_ENV"
echo "${invalid_commit_messages}" >> "$GITHUB_ENV"
echo "$EOF" >> "$GITHUB_ENV"
fi
- name: "Publish failed commit messages"
uses: marocchino/sticky-pull-request-comment@v2
# When the previous steps fails, the workflow would stop. By adding this
# condition you can continue the execution with the populated error message.
if: always() && (steps.check_commit_message.outputs.exit_code != 0)
with:
header: commit-message-lint-error
message: |
Commits must follow the [Conventional Commits specification](https://www.conventionalcommits.org/en/v1.0.0/)
Please fix these commit messages:
```
${{ env.error_message }}
```
# Delete a previous comment when the issue has been resolved
- name: "Delete previous comment"
if: ${{ steps.check_commit_message.outputs.exit_code == 0 }}
uses: marocchino/sticky-pull-request-comment@v2
with:
header: commit-message-lint-error
delete: true
- name: "Mark as failed"
if: steps.check_commit_message.outputs.exit_code != 0
uses: actions/github-script@v7
with:
script: |
core.setFailed("Some commit messages are ill-formed")

View File

@@ -6,6 +6,7 @@ on:
branches:
- master
pull_request:
merge_group:
workflow_dispatch:
concurrency:

View File

@@ -8,7 +8,7 @@ on:
jobs:
test_amd64:
name: Daily amd64
uses: ./.github/workflows/base_daily_tests.yml
uses: ./.github/workflows/daily_common.yml
with:
nim: "[{'branch': 'version-1-6', 'memory_management': 'refc'}, {'branch': 'version-2-0', 'memory_management': 'refc'}]"
nim: "[{'ref': 'version-1-6', 'memory_management': 'refc'}, {'ref': 'version-2-0', 'memory_management': 'refc'}]"
cpu: "['amd64']"

View File

@@ -7,7 +7,7 @@ on:
nim:
description: 'Nim Configuration'
required: true
type: string # Following this format: [{"branch": ..., "memory_management": ...}, ...]
type: string # Following this format: [{"ref": ..., "memory_management": ...}, ...]
cpu:
description: 'CPU'
required: true
@@ -17,6 +17,11 @@ on:
required: false
type: string
default: "[]"
use_sat_solver:
description: 'Install dependencies with SAT Solver'
required: false
type: boolean
default: false
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
@@ -53,9 +58,8 @@ jobs:
run:
shell: ${{ matrix.platform.shell }}
name: '${{ matrix.platform.os }}-${{ matrix.cpu }} (Nim ${{ matrix.nim.branch }})'
name: '${{ matrix.platform.os }}-${{ matrix.cpu }} (Nim ${{ matrix.nim.ref }})'
runs-on: ${{ matrix.platform.builder }}
continue-on-error: ${{ matrix.nim.branch == 'devel' || matrix.nim.branch == 'version-2-0' }}
steps:
- name: Checkout
uses: actions/checkout@v4
@@ -65,13 +69,13 @@ jobs:
with:
os: ${{ matrix.platform.os }}
shell: ${{ matrix.platform.shell }}
nim_branch: ${{ matrix.nim.branch }}
nim_ref: ${{ matrix.nim.ref }}
cpu: ${{ matrix.cpu }}
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: '~1.15.5'
go-version: '~1.16.0'
cache: false
- name: Install p2pd
@@ -86,4 +90,12 @@ jobs:
run: |
nim --version
nimble --version
NIMFLAGS="${NIMFLAGS} --mm:${{ matrix.nim.memory_management }}" nimble test
if [[ "${{ inputs.use_sat_solver }}" == "true" ]]; then
dependency_solver="sat"
else
dependency_solver="legacy"
fi
NIMFLAGS="${NIMFLAGS} --mm:${{ matrix.nim.memory_management }} --solver:${dependency_solver}"
nimble test

View File

@@ -8,7 +8,7 @@ on:
jobs:
test_nim_devel:
name: Daily Nim Devel
uses: ./.github/workflows/base_daily_tests.yml
uses: ./.github/workflows/daily_common.yml
with:
nim: "[{'branch': 'devel', 'memory_management': 'orc'}]"
nim: "[{'ref': 'devel', 'memory_management': 'orc'}]"
cpu: "['amd64']"

View File

@@ -8,8 +8,8 @@ on:
jobs:
test_i386:
name: Daily i386 (Linux)
uses: ./.github/workflows/base_daily_tests.yml
uses: ./.github/workflows/daily_common.yml
with:
nim: "[{'branch': 'version-1-6', 'memory_management': 'refc'}, {'branch': 'version-2-0', 'memory_management': 'refc'}, {'branch': 'devel', 'memory_management': 'orc'}]"
nim: "[{'ref': 'version-1-6', 'memory_management': 'refc'}, {'ref': 'version-2-0', 'memory_management': 'refc'}, {'ref': 'devel', 'memory_management': 'orc'}]"
cpu: "['i386']"
exclude: "[{'platform': {'os':'macos'}}, {'platform': {'os':'windows'}}]"

15
.github/workflows/daily_sat.yml vendored Normal file
View File

@@ -0,0 +1,15 @@
name: Daily SAT
on:
schedule:
- cron: "30 6 * * *"
workflow_dispatch:
jobs:
test_amd64:
name: Daily SAT
uses: ./.github/workflows/daily_common.yml
with:
nim: "[{'ref': 'version-2-0', 'memory_management': 'refc'}]"
cpu: "['amd64']"
use_sat_solver: true

View File

@@ -2,6 +2,7 @@ name: Interoperability Tests
on:
pull_request:
merge_group:
push:
branches:
- master
@@ -16,8 +17,9 @@ jobs:
name: Run transport interoperability tests
runs-on: ubuntu-22.04
steps:
- name: Free Disk Space (Ubuntu)
# For some reason the original job (libp2p/test-plans) has enough disk space, but this one doesn't.
- name: Free Disk Space
# For some reason we have space issues while running this action. Likely while building the image.
# This action will free up some space to avoid the issue.
uses: jlumbroso/free-disk-space@v1.3.1
with:
tool-cache: true
@@ -31,6 +33,10 @@ jobs:
with:
test-filter: nim-libp2p-head
extra-versions: ${{ github.workspace }}/tests/transport-interop/version.json
s3-cache-bucket: ${{ vars.S3_LIBP2P_BUILD_CACHE_BUCKET_NAME }}
s3-access-key-id: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_ACCESS_KEY_ID }}
s3-secret-access-key: ${{ secrets.S3_LIBP2P_BUILD_CACHE_AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_REGION }}
run-hole-punching-interop:
name: Run hole-punching interoperability tests
@@ -45,3 +51,7 @@ jobs:
with:
test-filter: nim-libp2p-head
extra-versions: ${{ github.workspace }}/tests/hole-punching-interop/version.json
s3-cache-bucket: ${{ vars.S3_LIBP2P_BUILD_CACHE_BUCKET_NAME }}
s3-access-key-id: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_ACCESS_KEY_ID }}
s3-secret-access-key: ${{ secrets.S3_LIBP2P_BUILD_CACHE_AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_REGION }}

View File

@@ -2,6 +2,7 @@ name: Linters
on:
pull_request:
merge_group:
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
@@ -29,4 +30,5 @@ jobs:
- name: Check style
run: |
shopt -s extglob # Enable extended globbing
./nph --check examples libp2p tests tools *.@(nim|nims|nimble)
./nph examples libp2p tests tools *.@(nim|nims|nimble)
git diff --exit-code

35
.github/workflows/pr_lint.yml vendored Normal file
View File

@@ -0,0 +1,35 @@
name: "Conventional Commits"
on:
pull_request:
types:
- opened
- edited
- reopened
- synchronize
jobs:
main:
name: Validate PR title
runs-on: ubuntu-latest
permissions:
pull-requests: write
steps:
- uses: amannn/action-semantic-pull-request@v5
id: lint_pr_title
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- uses: marocchino/sticky-pull-request-comment@v2
# When the previous steps fails, the workflow would stop. By adding this
# condition you can continue the execution with the populated error message.
if: always() && (steps.lint_pr_title.outputs.error_message != null)
with:
header: pr-title-lint-error
message: |
Pull requests titles must follow the [Conventional Commits specification](https://www.conventionalcommits.org/en/v1.0.0/)
# Delete a previous comment when the issue has been resolved
- if: ${{ steps.lint_pr_title.outputs.error_message == null }}
uses: marocchino/sticky-pull-request-comment@v2
with:
header: pr-title-lint-error
delete: true

View File

@@ -1,12 +1,14 @@
bearssl;https://github.com/status-im/nim-bearssl@#667b40440a53a58e9f922e29e20818720c62d9ac
chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a
chronos;https://github.com/status-im/nim-chronos@#dc3847e4d6733dfc3811454c2a9c384b87343e26
chronos;https://github.com/status-im/nim-chronos@#c04576d829b8a0a1b12baaa8bc92037501b3a4a0
dnsclient;https://github.com/ba0f3/dnsclient.nim@#23214235d4784d24aceed99bbfe153379ea557c8
faststreams;https://github.com/status-im/nim-faststreams@#720fc5e5c8e428d9d0af618e1e27c44b42350309
httputils;https://github.com/status-im/nim-http-utils@#3b491a40c60aad9e8d3407443f46f62511e63b18
json_serialization;https://github.com/status-im/nim-json-serialization@#85b7ea093cb85ee4f433a617b97571bd709d30df
metrics;https://github.com/status-im/nim-metrics@#6142e433fc8ea9b73379770a788017ac528d46ff
ngtcp2;https://github.com/status-im/nim-ngtcp2@#6834f4756b6af58356ac9c4fef3d71db3c3ae5fe
nimcrypto;https://github.com/cheatfate/nimcrypto@#1c8d6e3caf3abc572136ae9a1da81730c4eb4288
quic;https://github.com/status-im/nim-quic.git@#ddcb31ffb74b5460ab37fd13547eca90594248bc
results;https://github.com/arnetheduck/nim-results@#f3c666a272c69d70cb41e7245e7f6844797303ad
secp256k1;https://github.com/status-im/nim-secp256k1@#7246d91c667f4cc3759fdd50339caa45a2ecd8be
serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe54049950e352bb969aab97173b35

View File

@@ -12,6 +12,9 @@ switch("warning", "LockLevel:off")
switch("warningAsError", "UseBase:on")
--styleCheck:
error
--mm:
refc
# reconsider when there's a version-2-2 branch worth testing with as we might switch to orc
# Avoid some rare stack corruption while using exceptions with a SEH-enabled
# toolchain: https://github.com/status-im/nimbus-eth2/issues/3121

View File

@@ -13,7 +13,7 @@ For more information about the go daemon, check out [this repository](https://gi
> **Required only** for running the tests.
# Prerequisites
Go with version `1.15.15`.
Go with version `1.16.0`.
> You will *likely* be able to build `go-libp2p-daemon` with different Go versions, but **they haven't been tested**.
# Installation
@@ -21,7 +21,7 @@ Follow one of the methods below:
## Script
Run the build script while having the `go` command pointing to the correct Go version.
We recommend using `1.15.15`, as previously stated.
We recommend using `1.16.0`, as previously stated.
```sh
./scripts/build_p2pd.sh
```

5
funding.json Normal file
View File

@@ -0,0 +1,5 @@
{
"opRetro": {
"projectId": "0xc9561ba3e4eca5483b40f8b1a254a73c91fefe4f8aee32dc20c0d96dcf33fe80"
}
}

View File

@@ -52,6 +52,7 @@ else:
stream/connection,
transports/transport,
transports/tcptransport,
transports/quictransport,
protocols/secure/noise,
cid,
multihash,

View File

@@ -1,16 +1,17 @@
mode = ScriptMode.Verbose
packageName = "libp2p"
version = "1.5.0"
version = "1.7.1"
author = "Status Research & Development GmbH"
description = "LibP2P implementation"
license = "MIT"
skipDirs = @["tests", "examples", "Nim", "tools", "scripts", "docs"]
requires "nim >= 1.6.0",
"nimcrypto >= 0.4.1", "dnsclient >= 0.3.0 & < 0.4.0", "bearssl >= 0.2.5",
"chronicles >= 0.10.2", "chronos >= 4.0.2", "metrics", "secp256k1", "stew#head",
"websock", "unittest2"
"nimcrypto >= 0.6.0 & < 0.7.0", "dnsclient >= 0.3.0 & < 0.4.0", "bearssl >= 0.2.5",
"chronicles >= 0.10.2", "chronos >= 4.0.3", "metrics", "secp256k1", "stew#head",
"websock", "unittest2",
"https://github.com/status-im/nim-quic.git#ddcb31ffb74b5460ab37fd13547eca90594248bc"
let nimc = getEnv("NIMC", "nim") # Which nim compiler to use
let lang = getEnv("NIMLANG", "c") # Which backend (c/cpp/js)
@@ -19,8 +20,8 @@ let verbose = getEnv("V", "") notin ["", "0"]
let cfg =
" --styleCheck:usages --styleCheck:error" &
(if verbose: "" else: " --verbosity:0 --hints:off") &
" --skipParentCfg --skipUserCfg -f" & " --threads:on --opt:speed"
(if verbose: "" else: " --verbosity:0 --hints:off") & " --skipUserCfg -f" &
" --threads:on --opt:speed"
import hashes, strutils
@@ -124,9 +125,8 @@ task examples_build, "Build the samples":
buildSample("tutorial_3_protobuf", true)
buildSample("tutorial_4_gossipsub", true)
buildSample("tutorial_5_discovery", true)
exec "nimble install -y nimpng@#HEAD"
# this is to fix broken build on 1.7.3, remove it when nimpng version 0.3.2 or later is released
exec "nimble install -y nico@#af99dd60bf2b395038ece815ea1012330a80d6e6"
exec "nimble install -y nimpng"
exec "nimble install -y nico --passNim=--skipParentCfg"
buildSample("tutorial_6_game", false, "--styleCheck:off")
# pin system

View File

@@ -136,7 +136,8 @@ proc triggerConnEvent*(c: ConnManager, peerId: PeerId, event: ConnEvent) {.async
except CancelledError as exc:
raise exc
except CatchableError as exc:
warn "Exception in triggerConnEvents", msg = exc.msg, peer = peerId, event = $event
warn "Exception in triggerConnEvents",
description = exc.msg, peer = peerId, event = $event
proc addPeerEventHandler*(
c: ConnManager, handler: PeerEventHandler, kind: PeerEventKind
@@ -169,7 +170,7 @@ proc triggerPeerEvents*(c: ConnManager, peerId: PeerId, event: PeerEvent) {.asyn
except CancelledError as exc:
raise exc
except CatchableError as exc: # handlers should not raise!
warn "Exception in triggerPeerEvents", exc = exc.msg, peer = peerId
warn "Exception in triggerPeerEvents", description = exc.msg, peer = peerId
proc expectConnection*(
c: ConnManager, p: PeerId, dir: Direction
@@ -212,7 +213,7 @@ proc closeMuxer(muxer: Muxer) {.async.} =
try:
await muxer.handler # TODO noraises?
except CatchableError as exc:
trace "Exception in close muxer handler", exc = exc.msg
trace "Exception in close muxer handler", description = exc.msg
trace "Cleaned up muxer", m = muxer
proc muxCleanup(c: ConnManager, mux: Muxer) {.async.} =
@@ -235,7 +236,7 @@ proc muxCleanup(c: ConnManager, mux: Muxer) {.async.} =
except CatchableError as exc:
# This is top-level procedure which will work as separate task, so it
# do not need to propagate CancelledError and should handle other errors
warn "Unexpected exception peer cleanup handler", mux, msg = exc.msg
warn "Unexpected exception peer cleanup handler", mux, description = exc.msg
proc onClose(c: ConnManager, mux: Muxer) {.async.} =
## connection close even handler
@@ -246,7 +247,8 @@ proc onClose(c: ConnManager, mux: Muxer) {.async.} =
await mux.connection.join()
trace "Connection closed, cleaning up", mux
except CatchableError as exc:
debug "Unexpected exception in connection manager's cleanup", errMsg = exc.msg, mux
debug "Unexpected exception in connection manager's cleanup",
description = exc.msg, mux
finally:
await c.muxCleanup(mux)
@@ -294,7 +296,8 @@ proc storeMuxer*(c: ConnManager, muxer: Muxer) {.raises: [CatchableError].} =
if expectedConn != nil and not expectedConn.finished:
expectedConn.complete(muxer)
else:
debug "Too many connections for peer", conns = c.muxed.getOrDefault(peerId).len
debug "Too many connections for peer",
conns = c.muxed.getOrDefault(peerId).len, peerId, dir
raise newTooManyConnectionsError()
@@ -358,7 +361,7 @@ proc trackConnection*(cs: ConnectionSlot, conn: Connection) =
try:
await conn.join()
except CatchableError as exc:
trace "Exception in semaphore monitor, ignoring", exc = exc.msg
trace "Exception in semaphore monitor, ignoring", description = exc.msg
cs.release()

View File

@@ -198,7 +198,7 @@ proc random*(
case scheme
of PKScheme.RSA:
when supported(PKScheme.RSA):
let rsakey = ?RsaPrivateKey.random(rng, bits).orError(KeyError)
let rsakey = ?RsaPrivateKey.random(rng, bits).orError(CryptoError.KeyError)
ok(PrivateKey(scheme: scheme, rsakey: rsakey))
else:
err(SchemeError)
@@ -210,7 +210,8 @@ proc random*(
err(SchemeError)
of PKScheme.ECDSA:
when supported(PKScheme.ECDSA):
let eckey = ?ecnist.EcPrivateKey.random(Secp256r1, rng).orError(KeyError)
let eckey =
?ecnist.EcPrivateKey.random(Secp256r1, rng).orError(CryptoError.KeyError)
ok(PrivateKey(scheme: scheme, eckey: eckey))
else:
err(SchemeError)
@@ -237,10 +238,11 @@ proc random*(
let skkey = SkPrivateKey.random(rng)
ok(PrivateKey(scheme: PKScheme.Secp256k1, skkey: skkey))
elif supported(PKScheme.RSA):
let rsakey = ?RsaPrivateKey.random(rng, bits).orError(KeyError)
let rsakey = ?RsaPrivateKey.random(rng, bits).orError(CryptoError.KeyError)
ok(PrivateKey(scheme: PKScheme.RSA, rsakey: rsakey))
elif supported(PKScheme.ECDSA):
let eckey = ?ecnist.EcPrivateKey.random(Secp256r1, rng).orError(KeyError)
let eckey =
?ecnist.EcPrivateKey.random(Secp256r1, rng).orError(CryptoError.KeyError)
ok(PrivateKey(scheme: PKScheme.ECDSA, eckey: eckey))
else:
err(SchemeError)
@@ -258,7 +260,7 @@ proc random*(
case scheme
of PKScheme.RSA:
when supported(PKScheme.RSA):
let pair = ?RsaKeyPair.random(rng, bits).orError(KeyError)
let pair = ?RsaKeyPair.random(rng, bits).orError(CryptoError.KeyError)
ok(
KeyPair(
seckey: PrivateKey(scheme: scheme, rsakey: pair.seckey),
@@ -280,7 +282,7 @@ proc random*(
err(SchemeError)
of PKScheme.ECDSA:
when supported(PKScheme.ECDSA):
let pair = ?EcKeyPair.random(Secp256r1, rng).orError(KeyError)
let pair = ?EcKeyPair.random(Secp256r1, rng).orError(CryptoError.KeyError)
ok(
KeyPair(
seckey: PrivateKey(scheme: scheme, eckey: pair.seckey),
@@ -583,7 +585,7 @@ proc init*(t: typedesc[PrivateKey], data: openArray[byte]): CryptoResult[Private
## Create new private key from libp2p's protobuf serialized binary form.
var res: t
if not res.init(data):
err(KeyError)
err(CryptoError.KeyError)
else:
ok(res)
@@ -591,7 +593,7 @@ proc init*(t: typedesc[PublicKey], data: openArray[byte]): CryptoResult[PublicKe
## Create new public key from libp2p's protobuf serialized binary form.
var res: t
if not res.init(data):
err(KeyError)
err(CryptoError.KeyError)
else:
ok(res)

View File

@@ -62,10 +62,12 @@ proc dialAndUpgrade(
libp2p_total_dial_attempts.inc()
await transport.dial(hostname, address, peerId)
except CancelledError as exc:
trace "Dialing canceled", err = exc.msg, peerId = peerId.get(default(PeerId))
trace "Dialing canceled",
description = exc.msg, peerId = peerId.get(default(PeerId))
raise exc
except CatchableError as exc:
debug "Dialing failed", err = exc.msg, peerId = peerId.get(default(PeerId))
debug "Dialing failed",
description = exc.msg, peerId = peerId.get(default(PeerId))
libp2p_failed_dials.inc()
return nil # Try the next address
@@ -87,7 +89,7 @@ proc dialAndUpgrade(
# we won't succeeded through another - no use in trying again
await dialed.close()
debug "Connection upgrade failed",
err = exc.msg, peerId = peerId.get(default(PeerId))
description = exc.msg, peerId = peerId.get(default(PeerId))
if dialed.dir == Direction.Out:
libp2p_failed_upgrades_outgoing.inc()
else:
@@ -200,7 +202,7 @@ proc internalConnect(
PeerEvent(kind: PeerEventKind.Identified, initiator: true),
)
except CatchableError as exc:
trace "Failed to finish outgoung upgrade", err = exc.msg
trace "Failed to finish outgoung upgrade", description = exc.msg
await muxed.close()
raise exc
@@ -327,7 +329,7 @@ method dial*(
await cleanup()
raise exc
except CatchableError as exc:
debug "Error dialing", conn, err = exc.msg
debug "Error dialing", conn, description = exc.msg
await cleanup()
raise exc

View File

@@ -64,7 +64,7 @@ method advertise*(self: RendezVousInterface) {.async.} =
try:
await self.rdv.advertise(toAdv, self.ttl)
except CatchableError as error:
debug "RendezVous advertise error: ", msg = error.msg
debug "RendezVous advertise error: ", description = error.msg
await sleepAsync(self.timeToAdvertise) or self.advertisementUpdated.wait()

View File

@@ -31,7 +31,7 @@ macro checkFutures*[F](futs: seq[F], exclude: untyped = []): untyped =
# We still don't abort but warn
debug "A future has failed, enable trace logging for details",
error = exc.name
trace "Exception message", msg = exc.msg, stack = getStackTrace()
trace "Exception message", description = exc.msg, stack = getStackTrace()
else:
quote:
for res in `futs`:
@@ -40,9 +40,9 @@ macro checkFutures*[F](futs: seq[F], exclude: untyped = []): untyped =
let exc = res.readError()
for i in 0 ..< `nexclude`:
if exc of `exclude`[i]:
trace "A future has failed", error = exc.name, msg = exc.msg
trace "A future has failed", error = exc.name, description = exc.msg
break check
# We still don't abort but warn
debug "A future has failed, enable trace logging for details",
error = exc.name
trace "Exception details", msg = exc.msg
trace "Exception details", description = exc.msg

View File

@@ -12,7 +12,7 @@
{.push raises: [].}
{.push public.}
import pkg/chronos, chronicles
import pkg/[chronos, chronicles, results]
import std/[nativesockets, net, hashes]
import tables, strutils, sets
import
@@ -25,8 +25,8 @@ import
protobuf/minprotobuf,
errors,
utility
import stew/[base58, base32, endians2, results]
export results, minprotobuf, vbuffer, utility
import stew/[base58, base32, endians2]
export results, minprotobuf, vbuffer, errors, utility
logScope:
topics = "libp2p multiaddress"
@@ -71,6 +71,9 @@ type
tcpProtocol
udpProtocol
func maErr*(msg: string): ref MaError =
(ref MaError)(msg: msg)
const
# These are needed in order to avoid an ambiguity error stemming from
# some cint constants with the same name defined in the posix modules
@@ -408,7 +411,12 @@ const
UDP_IP* = mapAnd(IP, mapEq("udp"))
UDP* = mapOr(UDP_DNS, UDP_IP)
UTP* = mapAnd(UDP, mapEq("utp"))
QUIC* = mapAnd(UDP, mapEq("quic"))
QUIC_IP* = mapAnd(UDP_IP, mapEq("quic"))
QUIC_DNS* = mapAnd(UDP_DNS, mapEq("quic"))
QUIC* = mapOr(QUIC_DNS, QUIC_IP)
QUIC_V1_IP* = mapAnd(UDP_IP, mapEq("quic-v1"))
QUIC_V1_DNS* = mapAnd(UDP_DNS, mapEq("quic-v1"))
QUIC_V1* = mapOr(QUIC_V1_DNS, QUIC_V1_IP)
UNIX* = mapEq("unix")
WS_DNS* = mapAnd(TCP_DNS, mapEq("ws"))
WS_IP* = mapAnd(TCP_IP, mapEq("ws"))
@@ -970,23 +978,21 @@ proc append*(m1: var MultiAddress, m2: MultiAddress): MaResult[void] =
else:
ok()
proc `&`*(m1, m2: MultiAddress): MultiAddress {.raises: [LPError].} =
proc `&`*(m1, m2: MultiAddress): MultiAddress {.raises: [MaError].} =
## Concatenates two addresses ``m1`` and ``m2``, and returns result.
##
## This procedure performs validation of concatenated result and can raise
## exception on error.
##
concat(m1, m2).valueOr:
raise maErr error
concat(m1, m2).tryGet()
proc `&=`*(m1: var MultiAddress, m2: MultiAddress) {.raises: [LPError].} =
proc `&=`*(m1: var MultiAddress, m2: MultiAddress) {.raises: [MaError].} =
## Concatenates two addresses ``m1`` and ``m2``.
##
## This procedure performs validation of concatenated result and can raise
## exception on error.
##
m1.append(m2).tryGet()
m1.append(m2).isOkOr:
raise maErr error
proc `==`*(m1: var MultiAddress, m2: MultiAddress): bool =
## Check of two MultiAddress are equal

View File

@@ -212,7 +212,7 @@ proc handle*(
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "Exception in multistream", conn, msg = exc.msg
trace "Exception in multistream", conn, description = exc.msg
finally:
await conn.close()

View File

@@ -116,7 +116,7 @@ proc reset*(s: LPChannel) {.async: (raises: []).} =
trace "sending reset message", s, conn = s.conn
await noCancel s.conn.writeMsg(s.id, s.resetCode) # write reset
except LPStreamError as exc:
trace "Can't send reset message", s, conn = s.conn, msg = exc.msg
trace "Can't send reset message", s, conn = s.conn, description = exc.msg
await s.conn.close()
asyncSpawn resetMessage()
@@ -145,7 +145,7 @@ method close*(s: LPChannel) {.async: (raises: []).} =
# It's harmless that close message cannot be sent - the connection is
# likely down already
await s.conn.close()
trace "Cannot send close message", s, id = s.id, msg = exc.msg
trace "Cannot send close message", s, id = s.id, description = exc.msg
await s.closeUnderlying() # maybe already eofed
@@ -256,7 +256,7 @@ proc completeWrite(
except LPStreamEOFError as exc:
raise exc
except LPStreamError as exc:
trace "exception in lpchannel write handler", s, msg = exc.msg
trace "exception in lpchannel write handler", s, description = exc.msg
await s.reset()
await s.conn.close()
raise newLPStreamConnDownError(exc)

View File

@@ -70,7 +70,7 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async: (raises: []), inline.} =
labelValues = [$chann.initiator, $m.connection.peerId],
)
except CancelledError as exc:
warn "Error cleaning up mplex channel", m, chann, msg = exc.msg
warn "Error cleaning up mplex channel", m, chann, description = exc.msg
proc newStreamInternal*(
m: Mplex,
@@ -175,7 +175,7 @@ method handle*(m: Mplex) {.async: (raises: []).} =
except LPStreamClosedError as exc:
# Channel is being closed, but `cleanupChann` was not yet triggered.
trace "pushing data to channel failed",
m, channel, len = data.len, msg = exc.msg
m, channel, len = data.len, description = exc.msg
discard # Ignore message, same as if `cleanupChann` had completed.
of MessageType.CloseIn, MessageType.CloseOut:
await channel.pushEof()
@@ -185,11 +185,11 @@ method handle*(m: Mplex) {.async: (raises: []).} =
except CancelledError:
debug "Unexpected cancellation in mplex handler", m
except LPStreamEOFError as exc:
trace "Stream EOF", m, msg = exc.msg
trace "Stream EOF", m, description = exc.msg
except LPStreamError as exc:
debug "Unexpected stream exception in mplex read loop", m, msg = exc.msg
debug "Unexpected stream exception in mplex read loop", m, description = exc.msg
except MuxerError as exc:
debug "Unexpected muxer exception in mplex read loop", m, msg = exc.msg
debug "Unexpected muxer exception in mplex read loop", m, description = exc.msg
finally:
await m.close()
trace "Stopped mplex handler", m

View File

@@ -513,9 +513,9 @@ method close*(m: Yamux) {.async: (raises: []).} =
try:
await m.connection.write(YamuxHeader.goAway(NormalTermination))
except CancelledError as exc:
trace "cancelled sending goAway", msg = exc.msg
trace "cancelled sending goAway", description = exc.msg
except LPStreamError as exc:
trace "failed to send goAway", msg = exc.msg
trace "failed to send goAway", description = exc.msg
await m.connection.close()
trace "Closed yamux"
@@ -601,7 +601,7 @@ method handle*(m: Yamux) {.async: (raises: []).} =
if header.length > 0:
var buffer = newSeqUninitialized[byte](header.length)
await m.connection.readExactly(addr buffer[0], int(header.length))
trace "Msg Rcv", msg = shortLog(buffer)
trace "Msg Rcv", description = shortLog(buffer)
await channel.gotDataFromRemote(buffer)
if MsgFlags.Fin in header.flags:
@@ -611,19 +611,19 @@ method handle*(m: Yamux) {.async: (raises: []).} =
trace "remote reset channel"
await channel.reset()
except CancelledError as exc:
debug "Unexpected cancellation in yamux handler", msg = exc.msg
debug "Unexpected cancellation in yamux handler", description = exc.msg
except LPStreamEOFError as exc:
trace "Stream EOF", msg = exc.msg
trace "Stream EOF", description = exc.msg
except LPStreamError as exc:
debug "Unexpected stream exception in yamux read loop", msg = exc.msg
debug "Unexpected stream exception in yamux read loop", description = exc.msg
except YamuxError as exc:
trace "Closing yamux connection", error = exc.msg
trace "Closing yamux connection", description = exc.msg
try:
await m.connection.write(YamuxHeader.goAway(ProtocolError))
except CancelledError, LPStreamError:
discard
except MuxerError as exc:
debug "Unexpected muxer exception in yamux read loop", msg = exc.msg
debug "Unexpected muxer exception in yamux read loop", description = exc.msg
try:
await m.connection.write(YamuxHeader.goAway(ProtocolError))
except CancelledError, LPStreamError:

View File

@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -10,7 +10,7 @@
{.push raises: [].}
import
std/[streams, strutils, sets, sequtils],
std/[streams, sets, sequtils],
chronos,
chronicles,
stew/byteutils,
@@ -39,24 +39,32 @@ proc questionToBuf(address: string, kind: QKind): seq[byte] =
var buf = newSeq[byte](dataLen)
discard requestStream.readData(addr buf[0], dataLen)
return buf
except CatchableError as exc:
info "Failed to created DNS buffer", msg = exc.msg
return newSeq[byte](0)
buf
except IOError as exc:
info "Failed to created DNS buffer", description = exc.msg
newSeq[byte](0)
except OSError as exc:
info "Failed to created DNS buffer", description = exc.msg
newSeq[byte](0)
except ValueError as exc:
info "Failed to created DNS buffer", description = exc.msg
newSeq[byte](0)
proc getDnsResponse(
dnsServer: TransportAddress, address: string, kind: QKind
): Future[Response] {.async.} =
): Future[Response] {.
async: (raises: [CancelledError, IOError, OSError, TransportError, ValueError])
.} =
var sendBuf = questionToBuf(address, kind)
if sendBuf.len == 0:
raise newException(ValueError, "Incorrect DNS query")
let receivedDataFuture = newFuture[void]()
let receivedDataFuture = Future[void].Raising([CancelledError]).init()
proc datagramDataReceived(
transp: DatagramTransport, raddr: TransportAddress
): Future[void] {.async, closure.} =
): Future[void] {.async: (raises: []), closure.} =
receivedDataFuture.complete()
let sock =
@@ -68,27 +76,41 @@ proc getDnsResponse(
try:
await sock.sendTo(dnsServer, addr sendBuf[0], sendBuf.len)
await receivedDataFuture or sleepAsync(5.seconds) #unix default
if not receivedDataFuture.finished:
try:
await receivedDataFuture.wait(5.seconds) #unix default
except AsyncTimeoutError:
raise newException(IOError, "DNS server timeout")
let rawResponse = sock.getMessage()
# parseResponse can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
return exceptionToAssert:
try:
parseResponse(string.fromBytes(rawResponse))
except IOError as exc:
raise exc
except OSError as exc:
raise exc
except ValueError as exc:
raise exc
except Exception as exc:
# Nim 1.6: parseResponse can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
raiseAssert exc.msg
finally:
await sock.closeWait()
method resolveIp*(
self: DnsResolver, address: string, port: Port, domain: Domain = Domain.AF_UNSPEC
): Future[seq[TransportAddress]] {.async.} =
): Future[seq[TransportAddress]] {.
async: (raises: [CancelledError, TransportAddressError])
.} =
trace "Resolving IP using DNS", address, servers = self.nameServers.mapIt($it), domain
for _ in 0 ..< self.nameServers.len:
let server = self.nameServers[0]
var responseFutures: seq[Future[Response]]
var responseFutures: seq[
Future[Response].Raising(
[CancelledError, IOError, OSError, TransportError, ValueError]
)
]
if domain == Domain.AF_INET or domain == Domain.AF_UNSPEC:
responseFutures.add(getDnsResponse(server, address, A))
@@ -103,23 +125,32 @@ method resolveIp*(
var
resolvedAddresses: OrderedSet[string]
resolveFailed = false
template handleFail(e): untyped =
info "Failed to query DNS", address, error = e.msg
resolveFailed = true
break
for fut in responseFutures:
try:
let resp = await fut
for answer in resp.answers:
# toString can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
resolvedAddresses.incl(exceptionToAssert(answer.toString()))
resolvedAddresses.incl(answer.toString())
except CancelledError as e:
raise e
except ValueError as e:
info "Invalid DNS query", address, error = e.msg
return @[]
except CatchableError as e:
info "Failed to query DNS", address, error = e.msg
resolveFailed = true
break
except IOError as e:
handleFail(e)
except OSError as e:
handleFail(e)
except TransportError as e:
handleFail(e)
except Exception as e:
# Nim 1.6: answer.toString can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
raiseAssert e.msg
if resolveFailed:
self.nameServers.add(self.nameServers[0])
@@ -132,27 +163,39 @@ method resolveIp*(
debug "Failed to resolve address, returning empty set"
return @[]
method resolveTxt*(self: DnsResolver, address: string): Future[seq[string]] {.async.} =
method resolveTxt*(
self: DnsResolver, address: string
): Future[seq[string]] {.async: (raises: [CancelledError]).} =
trace "Resolving TXT using DNS", address, servers = self.nameServers.mapIt($it)
for _ in 0 ..< self.nameServers.len:
let server = self.nameServers[0]
try:
# toString can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
let response = await getDnsResponse(server, address, TXT)
return exceptionToAssert:
trace "Got TXT response",
server = $server, answer = response.answers.mapIt(it.toString())
response.answers.mapIt(it.toString())
except CancelledError as e:
raise e
except CatchableError as e:
template handleFail(e): untyped =
info "Failed to query DNS", address, error = e.msg
self.nameServers.add(self.nameServers[0])
self.nameServers.delete(0)
continue
try:
let response = await getDnsResponse(server, address, TXT)
trace "Got TXT response",
server = $server, answer = response.answers.mapIt(it.toString())
return response.answers.mapIt(it.toString())
except CancelledError as e:
raise e
except IOError as e:
handleFail(e)
except OSError as e:
handleFail(e)
except TransportError as e:
handleFail(e)
except ValueError as e:
handleFail(e)
except Exception as e:
# Nim 1.6: toString can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
raiseAssert e.msg
debug "Failed to resolve TXT, returning empty set"
return @[]

View File

@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -25,17 +25,25 @@ type MockResolver* = ref object of NameResolver
method resolveIp*(
self: MockResolver, address: string, port: Port, domain: Domain = Domain.AF_UNSPEC
): Future[seq[TransportAddress]] {.async.} =
): Future[seq[TransportAddress]] {.
async: (raises: [CancelledError, TransportAddressError])
.} =
var res: seq[TransportAddress]
if domain == Domain.AF_INET or domain == Domain.AF_UNSPEC:
for resp in self.ipResponses.getOrDefault((address, false)):
result.add(initTAddress(resp, port))
res.add(initTAddress(resp, port))
if domain == Domain.AF_INET6 or domain == Domain.AF_UNSPEC:
for resp in self.ipResponses.getOrDefault((address, true)):
result.add(initTAddress(resp, port))
res.add(initTAddress(resp, port))
method resolveTxt*(self: MockResolver, address: string): Future[seq[string]] {.async.} =
return self.txtResponses.getOrDefault(address)
res
method resolveTxt*(
self: MockResolver, address: string
): Future[seq[string]] {.async: (raises: [CancelledError]).} =
self.txtResponses.getOrDefault(address)
proc new*(T: typedesc[MockResolver]): T =
T()

View File

@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -9,7 +9,7 @@
{.push raises: [].}
import std/[sugar, sets, sequtils, strutils]
import std/[sets, sequtils, strutils]
import chronos, chronicles, stew/endians2
import ".."/[multiaddress, multicodec]
@@ -20,19 +20,17 @@ type NameResolver* = ref object of RootObj
method resolveTxt*(
self: NameResolver, address: string
): Future[seq[string]] {.async, base.} =
): Future[seq[string]] {.async: (raises: [CancelledError]), base.} =
## Get TXT record
##
doAssert(false, "Not implemented!")
raiseAssert "Not implemented!"
method resolveIp*(
self: NameResolver, address: string, port: Port, domain: Domain = Domain.AF_UNSPEC
): Future[seq[TransportAddress]] {.async, base.} =
): Future[seq[TransportAddress]] {.
async: (raises: [CancelledError, TransportAddressError]), base
.} =
## Resolve the specified address
##
doAssert(false, "Not implemented!")
raiseAssert "Not implemented!"
proc getHostname*(ma: MultiAddress): string =
let
@@ -46,30 +44,40 @@ proc getHostname*(ma: MultiAddress): string =
proc resolveOneAddress(
self: NameResolver, ma: MultiAddress, domain: Domain = Domain.AF_UNSPEC, prefix = ""
): Future[seq[MultiAddress]] {.async.} =
#Resolve a single address
): Future[seq[MultiAddress]] {.
async: (raises: [CancelledError, MaError, TransportAddressError])
.} =
# Resolve a single address
let portPart = ma[1].valueOr:
raise maErr error
var pbuf: array[2, byte]
var dnsval = getHostname(ma)
if ma[1].tryGet().protoArgument(pbuf).tryGet() == 0:
raise newException(MaError, "Incorrect port number")
let plen = portPart.protoArgument(pbuf).valueOr:
raise maErr error
if plen == 0:
raise maErr "Incorrect port number"
let
port = Port(fromBytesBE(uint16, pbuf))
dnsval = getHostname(ma)
resolvedAddresses = await self.resolveIp(prefix & dnsval, port, domain)
return collect(newSeqOfCap(4)):
for address in resolvedAddresses:
var createdAddress = MultiAddress.init(address).tryGet()[0].tryGet()
for part in ma:
if DNS.match(part.tryGet()):
continue
createdAddress &= part.tryGet()
createdAddress
resolvedAddresses.mapIt:
let address = MultiAddress.init(it).valueOr:
raise maErr error
var createdAddress = address[0].valueOr:
raise maErr error
for part in ma:
let part = part.valueOr:
raise maErr error
if DNS.match(part):
continue
createdAddress &= part
createdAddress
proc resolveDnsAddr*(
self: NameResolver, ma: MultiAddress, depth: int = 0
): Future[seq[MultiAddress]] {.async.} =
): Future[seq[MultiAddress]] {.
async: (raises: [CancelledError, MaError, TransportAddressError])
.} =
if not DNSADDR.matchPartial(ma):
return @[ma]
@@ -78,54 +86,67 @@ proc resolveDnsAddr*(
info "Stopping DNSADDR recursion, probably malicious", ma
return @[]
var dnsval = getHostname(ma)
let txt = await self.resolveTxt("_dnsaddr." & dnsval)
let
dnsval = getHostname(ma)
txt = await self.resolveTxt("_dnsaddr." & dnsval)
trace "txt entries", txt
var result: seq[MultiAddress]
const codec = multiCodec("p2p")
let maCodec = block:
let hasCodec = ma.contains(codec).valueOr:
raise maErr error
if hasCodec:
ma[codec]
else:
(static(default(MaResult[MultiAddress])))
var res: seq[MultiAddress]
for entry in txt:
if not entry.startsWith("dnsaddr="):
continue
let entryValue = MultiAddress.init(entry[8 ..^ 1]).tryGet()
if entryValue.contains(multiCodec("p2p")).tryGet() and
ma.contains(multiCodec("p2p")).tryGet():
if entryValue[multiCodec("p2p")] != ma[multiCodec("p2p")]:
continue
let
entryValue = MultiAddress.init(entry[8 ..^ 1]).valueOr:
raise maErr error
entryHasCodec = entryValue.contains(multiCodec("p2p")).valueOr:
raise maErr error
if entryHasCodec and maCodec.isOk and entryValue[codec] != maCodec:
continue
let resolved = await self.resolveDnsAddr(entryValue, depth + 1)
for r in resolved:
result.add(r)
res.add(r)
if result.len == 0:
if res.len == 0:
debug "Failed to resolve a DNSADDR", ma
return @[]
return result
res
proc resolveMAddress*(
self: NameResolver, address: MultiAddress
): Future[seq[MultiAddress]] {.async.} =
): Future[seq[MultiAddress]] {.
async: (raises: [CancelledError, MaError, TransportAddressError])
.} =
var res = initOrderedSet[MultiAddress]()
if not DNS.matchPartial(address):
res.incl(address)
else:
let code = address[0].tryGet().protoCode().tryGet()
let seq =
case code
of multiCodec("dns"):
await self.resolveOneAddress(address)
of multiCodec("dns4"):
await self.resolveOneAddress(address, Domain.AF_INET)
of multiCodec("dns6"):
await self.resolveOneAddress(address, Domain.AF_INET6)
of multiCodec("dnsaddr"):
await self.resolveDnsAddr(address)
else:
assert false
@[address]
for ad in seq:
let
firstPart = address[0].valueOr:
raise maErr error
code = firstPart.protoCode().valueOr:
raise maErr error
ads =
case code
of multiCodec("dns"):
await self.resolveOneAddress(address)
of multiCodec("dns4"):
await self.resolveOneAddress(address, Domain.AF_INET)
of multiCodec("dns6"):
await self.resolveOneAddress(address, Domain.AF_INET6)
of multiCodec("dnsaddr"):
await self.resolveDnsAddr(address)
else:
raise maErr("Unsupported codec " & $code)
for ad in ads:
res.incl(ad)
return res.toSeq
res.toSeq

View File

@@ -63,6 +63,7 @@ type
KeyBook* {.public.} = ref object of PeerBook[PublicKey]
AgentBook* {.public.} = ref object of PeerBook[string]
LastSeenBook* {.public.} = ref object of PeerBook[Opt[MultiAddress]]
ProtoVersionBook* {.public.} = ref object of PeerBook[string]
SPRBook* {.public.} = ref object of PeerBook[Envelope]
@@ -145,10 +146,16 @@ proc del*(peerStore: PeerStore, peerId: PeerId) {.public.} =
for _, book in peerStore.books:
book.deletor(peerId)
proc updatePeerInfo*(peerStore: PeerStore, info: IdentifyInfo) =
if info.addrs.len > 0:
proc updatePeerInfo*(
peerStore: PeerStore,
info: IdentifyInfo,
observedAddr: Opt[MultiAddress] = Opt.none(MultiAddress),
) =
if len(info.addrs) > 0:
peerStore[AddressBook][info.peerId] = info.addrs
peerStore[LastSeenBook][info.peerId] = observedAddr
info.pubkey.withValue(pubkey):
peerStore[KeyBook][info.peerId] = pubkey
@@ -200,7 +207,7 @@ proc identify*(peerStore: PeerStore, muxer: Muxer) {.async.} =
knownAgent = shortAgent
muxer.connection.setShortAgent(knownAgent)
peerStore.updatePeerInfo(info)
peerStore.updatePeerInfo(info, stream.observedAddr)
finally:
await stream.closeWithEOF()

View File

@@ -84,13 +84,13 @@ proc tryDial(autonat: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.asy
except CancelledError as exc:
raise exc
except AllFuturesFailedError as exc:
debug "All dial attempts failed", addrs, exc = exc.msg
debug "All dial attempts failed", addrs, description = exc.msg
await conn.sendResponseError(DialError, "All dial attempts failed")
except AsyncTimeoutError as exc:
debug "Dial timeout", addrs, exc = exc.msg
debug "Dial timeout", addrs, description = exc.msg
await conn.sendResponseError(DialError, "Dial timeout")
except CatchableError as exc:
debug "Unexpected error", addrs, exc = exc.msg
debug "Unexpected error", addrs, description = exc.msg
await conn.sendResponseError(DialError, "Unexpected error")
finally:
autonat.sem.release()
@@ -165,7 +165,7 @@ proc new*(
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "exception in autonat handler", exc = exc.msg, conn
debug "exception in autonat handler", description = exc.msg, conn
finally:
trace "exiting autonat handler", conn
await conn.close()

View File

@@ -146,13 +146,13 @@ proc askPeer(
debug "dialMe answer is reachable"
Reachable
except AutonatUnreachableError as error:
debug "dialMe answer is not reachable", msg = error.msg
debug "dialMe answer is not reachable", description = error.msg
NotReachable
except AsyncTimeoutError as error:
debug "dialMe timed out", msg = error.msg
debug "dialMe timed out", description = error.msg
Unknown
except CatchableError as error:
debug "dialMe unexpected error", msg = error.msg
debug "dialMe unexpected error", description = error.msg
Unknown
let hasReachabilityOrConfidenceChanged = await self.handleAnswer(ans)
if hasReachabilityOrConfidenceChanged:
@@ -194,7 +194,7 @@ proc addressMapper(
processedMA = peerStore.guessDialableAddr(listenAddr)
# handle manual port forwarding
except CatchableError as exc:
debug "Error while handling address mapper", msg = exc.msg
debug "Error while handling address mapper", description = exc.msg
addrs.add(processedMA)
return addrs

View File

@@ -88,7 +88,7 @@ proc startSync*(
raise err
except AllFuturesFailedError as err:
debug "Dcutr initiator could not connect to the remote peer, all connect attempts failed",
peerDialableAddrs, msg = err.msg
peerDialableAddrs, description = err.msg
raise newException(
DcutrError,
"Dcutr initiator could not connect to the remote peer, all connect attempts failed",
@@ -96,7 +96,7 @@ proc startSync*(
)
except AsyncTimeoutError as err:
debug "Dcutr initiator could not connect to the remote peer, all connect attempts timed out",
peerDialableAddrs, msg = err.msg
peerDialableAddrs, description = err.msg
raise newException(
DcutrError,
"Dcutr initiator could not connect to the remote peer, all connect attempts timed out",
@@ -104,7 +104,7 @@ proc startSync*(
)
except CatchableError as err:
debug "Unexpected error when Dcutr initiator tried to connect to the remote peer",
err = err.msg
description = err.msg
raise newException(
DcutrError,
"Unexpected error when Dcutr initiator tried to connect to the remote peer", err,

View File

@@ -80,13 +80,13 @@ proc new*(
raise err
except AllFuturesFailedError as err:
debug "Dcutr receiver could not connect to the remote peer, " &
"all connect attempts failed", peerDialableAddrs, msg = err.msg
"all connect attempts failed", peerDialableAddrs, description = err.msg
except AsyncTimeoutError as err:
debug "Dcutr receiver could not connect to the remote peer, " &
"all connect attempts timed out", peerDialableAddrs, msg = err.msg
"all connect attempts timed out", peerDialableAddrs, description = err.msg
except CatchableError as err:
warn "Unexpected error when Dcutr receiver tried to connect " &
"to the remote peer", msg = err.msg
"to the remote peer", description = err.msg
let self = T()
self.handler = handleStream

View File

@@ -93,7 +93,7 @@ proc reserve*(
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error writing or reading reservation message", exc = exc.msg
trace "error writing or reading reservation message", description = exc.msg
raise newException(ReservationError, exc.msg)
if msg.msgType != HopMessageType.Status:
@@ -139,7 +139,7 @@ proc dialPeerV1*(
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error writing hop request", exc = exc.msg
trace "error writing hop request", description = exc.msg
raise exc
let msgRcvFromRelayOpt =
@@ -148,7 +148,7 @@ proc dialPeerV1*(
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error reading stop response", exc = exc.msg
trace "error reading stop response", description = exc.msg
await sendStatus(conn, StatusV1.HopCantOpenDstStream)
raise exc
@@ -190,13 +190,13 @@ proc dialPeerV2*(
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error reading stop response", exc = exc.msg
trace "error reading stop response", description = exc.msg
raise newException(RelayV2DialError, exc.msg)
if msgRcvFromRelay.msgType != HopMessageType.Status:
raise newException(RelayV2DialError, "Unexpected stop response")
if msgRcvFromRelay.status.get(UnexpectedMessage) != Ok:
trace "Relay stop failed", msg = msgRcvFromRelay.status
trace "Relay stop failed", description = msgRcvFromRelay.status
raise newException(RelayV2DialError, "Relay stop failure")
conn.limitDuration = msgRcvFromRelay.limit.duration
conn.limitData = msgRcvFromRelay.limit.data
@@ -302,7 +302,7 @@ proc new*(
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in client handler", exc = exc.msg, conn
trace "exception in client handler", description = exc.msg, conn
finally:
trace "exiting client handler", conn
await conn.close()

View File

@@ -167,7 +167,7 @@ proc handleConnect(r: Relay, connSrc: Connection, msg: HopMessage) {.async.} =
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error opening relay stream", dst, exc = exc.msg
trace "error opening relay stream", dst, description = exc.msg
await sendHopStatus(connSrc, ConnectionFailed)
return
defer:
@@ -196,7 +196,7 @@ proc handleConnect(r: Relay, connSrc: Connection, msg: HopMessage) {.async.} =
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error sending stop message", msg = exc.msg
trace "error sending stop message", description = exc.msg
await sendHopStatus(connSrc, ConnectionFailed)
return
@@ -213,7 +213,7 @@ proc handleHopStreamV2*(r: Relay, conn: Connection) {.async.} =
let msg = HopMessage.decode(await conn.readLp(r.msgSize)).valueOr:
await sendHopStatus(conn, MalformedMessage)
return
trace "relayv2 handle stream", msg = msg
trace "relayv2 handle stream", hopMsg = msg
case msg.msgType
of HopMessageType.Reserve:
await r.handleReserve(conn)
@@ -272,7 +272,7 @@ proc handleHop*(r: Relay, connSrc: Connection, msg: RelayMessage) {.async.} =
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error opening relay stream", dst, exc = exc.msg
trace "error opening relay stream", dst, description = exc.msg
await sendStatus(connSrc, StatusV1.HopCantDialDst)
return
defer:
@@ -289,12 +289,13 @@ proc handleHop*(r: Relay, connSrc: Connection, msg: RelayMessage) {.async.} =
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error writing stop handshake or reading stop response", exc = exc.msg
trace "error writing stop handshake or reading stop response",
description = exc.msg
await sendStatus(connSrc, StatusV1.HopCantOpenDstStream)
return
let msgRcvFromDst = msgRcvFromDstOpt.valueOr:
trace "error reading stop response", msg = msgRcvFromDstOpt
trace "error reading stop response", response = msgRcvFromDstOpt
await sendStatus(connSrc, StatusV1.HopCantOpenDstStream)
return
@@ -369,7 +370,7 @@ proc new*(
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "exception in relayv2 handler", exc = exc.msg, conn
debug "exception in relayv2 handler", description = exc.msg, conn
finally:
trace "exiting relayv2 handler", conn
await conn.close()

View File

@@ -87,7 +87,7 @@ proc bridge*(
trace "relay src closed connection", src = connSrc.peerId
if connDst.closed() or connDst.atEof():
trace "relay dst closed connection", dst = connDst.peerId
trace "relay error", exc = exc.msg
trace "relay error", description = exc.msg
trace "end relaying", bytesSentFromSrcToDst, bytesSentFromDstToSrc
await futSrc.cancelAndWait()
await futDst.cancelAndWait()

View File

@@ -156,7 +156,7 @@ method init*(p: Identify) =
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in identify handler", exc = exc.msg, conn
trace "exception in identify handler", description = exc.msg, conn
finally:
trace "exiting identify handler", conn
await conn.closeWithEOF()
@@ -226,7 +226,7 @@ proc init*(p: IdentifyPush) =
except CancelledError as exc:
raise exc
except CatchableError as exc:
info "exception in identify push handler", exc = exc.msg, conn
info "exception in identify push handler", description = exc.msg, conn
finally:
trace "exiting identify push handler", conn
await conn.closeWithEOF()

View File

@@ -49,7 +49,7 @@ proc new*(T: typedesc[Perf]): T {.public.} =
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in perf handler", exc = exc.msg, conn
trace "exception in perf handler", description = exc.msg, conn
await conn.close()
p.handler = handle

View File

@@ -63,7 +63,7 @@ method init*(p: Ping) =
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in ping handler", exc = exc.msg, conn
trace "exception in ping handler", description = exc.msg, conn
p.handler = handle
p.codec = PingCodec

View File

@@ -106,7 +106,7 @@ method rpcHandler*(f: FloodSub, peer: PubSubPeer, data: seq[byte]) {.async.} =
debug "failed to decode msg from peer", peer, err = error
raise newException(CatchableError, "Peer msg couldn't be decoded")
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
trace "decoded msg from peer", peer, payload = rpcMsg.shortLog
# trigger hooks
peer.recvObservers(rpcMsg)
@@ -187,7 +187,7 @@ method init*(f: FloodSub) =
# do not need to propagate CancelledError.
trace "Unexpected cancellation in floodsub handler", conn
except CatchableError as exc:
trace "FloodSub handler leaks an error", exc = exc.msg, conn
trace "FloodSub handler leaks an error", description = exc.msg, conn
f.handler = handler
f.codec = FloodSubCodec
@@ -219,7 +219,7 @@ method publish*(f: FloodSub, topic: string, data: seq[byte]): Future[int] {.asyn
trace "Error generating message id, skipping publish", error = error
return 0
trace "Created new message", msg = shortLog(msg), peers = peers.len, topic, msgId
trace "Created new message", payload = shortLog(msg), peers = peers.len, topic, msgId
if f.addSeen(f.salt(msgId)):
# custom msgid providers might cause this

View File

@@ -220,7 +220,7 @@ method init*(g: GossipSub) =
# do not need to propogate CancelledError.
trace "Unexpected cancellation in gossipsub handler", conn
except CatchableError as exc:
trace "GossipSub handler leaks an error", exc = exc.msg, conn
trace "GossipSub handler leaks an error", description = exc.msg, conn
g.handler = handler
g.codecs &= GossipSubCodec_12
@@ -368,7 +368,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
trace "sending control message", msg = shortLog(respControl), peer
trace "sending control message", payload = shortLog(respControl), peer
g.send(peer, RPCMsg(control: some(respControl)), isHighPriority = true)
if messages.len > 0:
@@ -491,7 +491,7 @@ proc validateAndRelay(
await handleData(g, topic, msg.data)
except CatchableError as exc:
info "validateAndRelay failed", msg = exc.msg
info "validateAndRelay failed", description = exc.msg
proc dataAndTopicsIdSize(msgs: seq[Message]): int =
msgs.mapIt(it.data.len + it.topic.len).foldl(a + b, 0)
@@ -540,7 +540,7 @@ method rpcHandler*(g: GossipSub, peer: PubSubPeer, data: seq[byte]) {.async.} =
for m in rpcMsg.messages:
libp2p_pubsub_received_messages.inc(labelValues = [$peer.peerId, m.topic])
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
trace "decoded msg from peer", peer, payload = rpcMsg.shortLog
await rateLimit(g, peer, g.messageOverhead(rpcMsg, msgSize))
# trigger hooks - these may modify the message
@@ -771,7 +771,7 @@ method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.asy
logScope:
msgId = shortLog(msgId)
trace "Created new message", msg = shortLog(msg), peers = peers.len
trace "Created new message", payload = shortLog(msg), peers = peers.len
if g.addSeen(g.salt(msgId)):
# If the message was received or published recently, don't re-publish it -
@@ -806,7 +806,7 @@ proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.as
trace "Direct peer dial canceled"
raise exc
except CatchableError as exc:
debug "Direct peer error dialing", msg = exc.msg
debug "Direct peer error dialing", description = exc.msg
proc addDirectPeer*(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} =
g.parameters.directPeers[id] = addrs

View File

@@ -135,7 +135,7 @@ proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} =
try:
await g.switch.disconnect(peer.peerId)
except CatchableError as exc: # Never cancelled
trace "Failed to close connection", peer, error = exc.name, msg = exc.msg
trace "Failed to close connection", peer, errName = exc.name, description = exc.msg
proc disconnectIfBadScorePeer*(g: GossipSub, peer: PubSubPeer, score: float64) =
if g.parameters.disconnectBadPeers and score < g.parameters.graylistThreshold and

View File

@@ -197,7 +197,7 @@ proc send*(
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
## priority messages have been sent.
trace "sending pubsub message to peer", peer, msg = shortLog(msg)
trace "sending pubsub message to peer", peer, payload = shortLog(msg)
peer.send(msg, p.anonymize, isHighPriority)
proc broadcast*(
@@ -255,7 +255,7 @@ proc broadcast*(
else:
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = ["generic"])
trace "broadcasting messages to peers", peers = sendPeers.len, msg = shortLog(msg)
trace "broadcasting messages to peers", peers = sendPeers.len, payload = shortLog(msg)
if anyIt(sendPeers, it.hasObservers):
for peer in sendPeers:
@@ -403,7 +403,7 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] =
for fut in futs:
if fut.failed:
let err = fut.readError()
warn "Error in topic handler", msg = err.msg
warn "Error in topic handler", description = err.msg
return waiter()
@@ -437,7 +437,7 @@ method handleConn*(p: PubSub, conn: Connection, proto: string) {.base, async.} =
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception ocurred in pubsub handle", exc = exc.msg, conn
trace "exception ocurred in pubsub handle", description = exc.msg, conn
finally:
await conn.closeWithEOF()

View File

@@ -205,10 +205,10 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
data = newSeq[byte]() # Release memory
except PeerRateLimitError as exc:
debug "Peer rate limit exceeded, exiting read while",
conn, peer = p, error = exc.msg
conn, peer = p, description = exc.msg
except CatchableError as exc:
debug "Exception occurred in PubSubPeer.handle",
conn, peer = p, closed = conn.closed, exc = exc.msg
conn, peer = p, closed = conn.closed, description = exc.msg
finally:
await conn.close()
except CancelledError:
@@ -217,7 +217,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
trace "Unexpected cancellation in PubSubPeer.handle"
except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle",
conn, peer = p, closed = conn.closed, exc = exc.msg
conn, peer = p, closed = conn.closed, description = exc.msg
finally:
debug "exiting pubsub read loop", conn, peer = p, closed = conn.closed
@@ -236,7 +236,7 @@ proc closeSendConn(p: PubSubPeer, event: PubSubPeerEventKind) {.async.} =
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "Errors during diconnection events", error = exc.msg
debug "Errors during diconnection events", description = exc.msg
# don't cleanup p.address else we leak some gossip stat table
proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
@@ -283,7 +283,7 @@ proc connectImpl(p: PubSubPeer) {.async.} =
return
await connectOnce(p)
except CatchableError as exc: # never cancelled
debug "Could not establish send connection", msg = exc.msg
debug "Could not establish send connection", description = exc.msg
proc connect*(p: PubSubPeer) =
if p.connected:
@@ -325,7 +325,7 @@ proc sendMsgContinue(conn: Connection, msgFut: Future[void]) {.async.} =
except CatchableError as exc: # never cancelled
# Because we detach the send call from the currently executing task using
# asyncSpawn, no exceptions may leak out of it
trace "Unable to send to remote", conn, msg = exc.msg
trace "Unable to send to remote", conn, description = exc.msg
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled
@@ -341,7 +341,7 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} =
var conn = p.sendConn
if conn == nil or conn.closed():
debug "No send connection", p, msg = shortLog(msg)
debug "No send connection", p, payload = shortLog(msg)
return
trace "sending encoded msg to peer", conn, encoded = shortLog(msg)
@@ -383,7 +383,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v
) == 0
if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
debug "empty message, skipping", p, payload = shortLog(msg)
Future[void].completed()
elif msg.len > p.maxMessageSize:
info "trying to send a msg too big for pubsub",

View File

@@ -310,7 +310,7 @@ proc decodeMessages*(pb: ProtoBuffer): ProtoResult[seq[Message]] {.inline.} =
ok(msgs)
proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] =
trace "encodeRpcMsg: encoding message", msg = msg.shortLog()
trace "encodeRpcMsg: encoding message", payload = msg.shortLog()
var pb = initProtoBuffer(maxSize = uint.high)
for item in msg.subscriptions:
pb.write(1, item)
@@ -329,7 +329,7 @@ proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] =
pb.buffer
proc decodeRpcMsg*(msg: seq[byte]): ProtoResult[RPCMsg] {.inline.} =
trace "decodeRpcMsg: decoding message", msg = msg.shortLog()
trace "decodeRpcMsg: decoding message", payload = msg.shortLog()
var pb = initProtoBuffer(msg, maxSize = uint.high)
var rpcMsg = RPCMsg()
assign(rpcMsg.messages, ?pb.decodeMessages())

View File

@@ -35,8 +35,6 @@ const
RendezVousCodec* = "/rendezvous/1.0.0"
MinimumDuration* = 2.hours
MaximumDuration = 72.hours
MinimumTTL = MinimumDuration.seconds.uint64
MaximumTTL = MaximumDuration.seconds.uint64
RegistrationLimitPerPeer = 1000
DiscoverLimit = 1000'u64
SemaphoreDefaultSize = 5
@@ -320,6 +318,10 @@ type
peers: seq[PeerId]
cookiesSaved: Table[PeerId, Table[string, seq[byte]]]
switch: Switch
minDuration: Duration
maxDuration: Duration
minTTL: uint64
maxTTL: uint64
proc checkPeerRecord(spr: seq[byte], peerId: PeerId): Result[void, string] =
if spr.len == 0:
@@ -395,7 +397,7 @@ proc save(
rdv.registered.add(
RegisteredData(
peerId: peerId,
expiration: Moment.now() + r.ttl.get(MinimumTTL).int64.seconds,
expiration: Moment.now() + r.ttl.get(rdv.minTTL).int64.seconds,
data: r,
)
)
@@ -409,8 +411,8 @@ proc register(rdv: RendezVous, conn: Connection, r: Register): Future[void] =
libp2p_rendezvous_register.inc()
if r.ns.len notin 1 .. 255:
return conn.sendRegisterResponseError(InvalidNamespace)
let ttl = r.ttl.get(MinimumTTL)
if ttl notin MinimumTTL .. MaximumTTL:
let ttl = r.ttl.get(rdv.minTTL)
if ttl notin rdv.minTTL .. rdv.maxTTL:
return conn.sendRegisterResponseError(InvalidTTL)
let pr = checkPeerRecord(r.signedPeerRecord, conn.peerId)
if pr.isErr():
@@ -499,31 +501,42 @@ proc advertisePeer(rdv: RendezVous, peer: PeerId, msg: seq[byte]) {.async.} =
else:
trace "Successfully registered", peer, response = msgRecv.registerResponse
except CatchableError as exc:
trace "exception in the advertise", error = exc.msg
trace "exception in the advertise", description = exc.msg
finally:
rdv.sema.release()
await rdv.sema.acquire()
discard await advertiseWrap().withTimeout(5.seconds)
method advertise*(
rdv: RendezVous, ns: string, ttl: Duration = MinimumDuration
) {.async, base.} =
let sprBuff = rdv.switch.peerInfo.signedPeerRecord.encode().valueOr:
raise newException(RendezVousError, "Wrong Signed Peer Record")
proc advertise*(
rdv: RendezVous, ns: string, ttl: Duration, peers: seq[PeerId]
) {.async.} =
if ns.len notin 1 .. 255:
raise newException(RendezVousError, "Invalid namespace")
if ttl notin MinimumDuration .. MaximumDuration:
raise newException(RendezVousError, "Invalid time to live")
if ttl notin rdv.minDuration .. rdv.maxDuration:
raise newException(RendezVousError, "Invalid time to live: " & $ttl)
let sprBuff = rdv.switch.peerInfo.signedPeerRecord.encode().valueOr:
raise newException(RendezVousError, "Wrong Signed Peer Record")
let
r = Register(ns: ns, signedPeerRecord: sprBuff, ttl: Opt.some(ttl.seconds.uint64))
msg = encode(Message(msgType: MessageType.Register, register: Opt.some(r)))
rdv.save(ns, rdv.switch.peerInfo.peerId, r)
let fut = collect(newSeq()):
for peer in rdv.peers:
let futs = collect(newSeq()):
for peer in peers:
trace "Send Advertise", peerId = peer, ns
rdv.advertisePeer(peer, msg.buffer)
await allFutures(fut)
await allFutures(futs)
method advertise*(
rdv: RendezVous, ns: string, ttl: Duration = rdv.minDuration
) {.async, base.} =
await rdv.advertise(ns, ttl, rdv.peers)
proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] =
let
@@ -540,9 +553,8 @@ proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] =
@[]
proc request*(
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int, peers: seq[PeerId]
): Future[seq[PeerRecord]] {.async.} =
let nsSalted = ns & rdv.salt
var
s: Table[PeerId, (PeerRecord, Register)]
limit: uint64
@@ -587,8 +599,8 @@ proc request*(
for r in resp.registrations:
if limit == 0:
return
let ttl = r.ttl.get(MaximumTTL + 1)
if ttl > MaximumTTL:
let ttl = r.ttl.get(rdv.maxTTL + 1)
if ttl > rdv.maxTTL:
continue
let
spr = SignedPeerRecord.decode(r.signedPeerRecord).valueOr:
@@ -596,7 +608,7 @@ proc request*(
pr = spr.data
if s.hasKey(pr.peerId):
let (prSaved, rSaved) = s[pr.peerId]
if (prSaved.seqNo == pr.seqNo and rSaved.ttl.get(MaximumTTL) < ttl) or
if (prSaved.seqNo == pr.seqNo and rSaved.ttl.get(rdv.maxTTL) < ttl) or
prSaved.seqNo < pr.seqNo:
s[pr.peerId] = (pr, r)
else:
@@ -605,8 +617,6 @@ proc request*(
for (_, r) in s.values():
rdv.save(ns, peer, r, false)
# copy to avoid resizes during the loop
let peers = rdv.peers
for peer in peers:
if limit == 0:
break
@@ -618,9 +628,14 @@ proc request*(
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception catch in request", error = exc.msg
trace "exception catch in request", description = exc.msg
return toSeq(s.values()).mapIt(it[0])
proc request*(
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int
): Future[seq[PeerRecord]] {.async.} =
await rdv.request(ns, l, rdv.peers)
proc unsubscribeLocally*(rdv: RendezVous, ns: string) =
let nsSalted = ns & rdv.salt
try:
@@ -630,26 +645,33 @@ proc unsubscribeLocally*(rdv: RendezVous, ns: string) =
except KeyError:
return
proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} =
# TODO: find a way to improve this, maybe something similar to the advertise
proc unsubscribe*(rdv: RendezVous, ns: string, peerIds: seq[PeerId]) {.async.} =
if ns.len notin 1 .. 255:
raise newException(RendezVousError, "Invalid namespace")
rdv.unsubscribeLocally(ns)
let msg = encode(
Message(msgType: MessageType.Unregister, unregister: Opt.some(Unregister(ns: ns)))
)
proc unsubscribePeer(rdv: RendezVous, peerId: PeerId) {.async.} =
proc unsubscribePeer(peerId: PeerId) {.async.} =
try:
let conn = await rdv.switch.dial(peerId, RendezVousCodec)
defer:
await conn.close()
await conn.writeLp(msg.buffer)
except CatchableError as exc:
trace "exception while unsubscribing", error = exc.msg
trace "exception while unsubscribing", description = exc.msg
for peer in rdv.peers:
discard await rdv.unsubscribePeer(peer).withTimeout(5.seconds)
let futs = collect(newSeq()):
for peer in peerIds:
unsubscribePeer(peer)
discard await allFutures(futs).withTimeout(5.seconds)
proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} =
rdv.unsubscribeLocally(ns)
await rdv.unsubscribe(ns, rdv.peers)
proc setup*(rdv: RendezVous, switch: Switch) =
rdv.switch = switch
@@ -662,7 +684,25 @@ proc setup*(rdv: RendezVous, switch: Switch) =
rdv.switch.addPeerEventHandler(handlePeer, Joined)
rdv.switch.addPeerEventHandler(handlePeer, Left)
proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T =
proc new*(
T: typedesc[RendezVous],
rng: ref HmacDrbgContext = newRng(),
minDuration = MinimumDuration,
maxDuration = MaximumDuration,
): T {.raises: [RendezVousError].} =
if minDuration < 1.minutes:
raise newException(RendezVousError, "TTL too short: 1 minute minimum")
if maxDuration > 72.hours:
raise newException(RendezVousError, "TTL too long: 72 hours maximum")
if minDuration >= maxDuration:
raise newException(RendezVousError, "Minimum TTL longer than maximum")
let
minTTL = minDuration.seconds.uint64
maxTTL = maxDuration.seconds.uint64
let rdv = T(
rng: rng,
salt: string.fromBytes(generateBytes(rng[], 8)),
@@ -670,6 +710,10 @@ proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T =
defaultDT: Moment.now() - 1.days,
#registerEvent: newAsyncEvent(),
sema: newAsyncSemaphore(SemaphoreDefaultSize),
minDuration: minDuration,
maxDuration: maxDuration,
minTTL: minTTL,
maxTTL: maxTTL,
)
logScope:
topics = "libp2p discovery rendezvous"
@@ -692,7 +736,7 @@ proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T =
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in rendezvous handler", error = exc.msg
trace "exception in rendezvous handler", description = exc.msg
finally:
await conn.close()
@@ -701,9 +745,13 @@ proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T =
return rdv
proc new*(
T: typedesc[RendezVous], switch: Switch, rng: ref HmacDrbgContext = newRng()
): T =
let rdv = T.new(rng)
T: typedesc[RendezVous],
switch: Switch,
rng: ref HmacDrbgContext = newRng(),
minDuration = MinimumDuration,
maxDuration = MaximumDuration,
): T {.raises: [RendezVousError].} =
let rdv = T.new(rng, minDuration, maxDuration)
rdv.setup(switch)
return rdv

View File

@@ -155,7 +155,7 @@ method init*(s: Secure) =
await conn.close()
raise exc
except LPStreamError as exc:
warn "securing connection failed", err = exc.msg, conn
warn "securing connection failed", description = exc.msg, conn
await conn.close()
s.handler = handle

View File

@@ -326,4 +326,4 @@ proc closeWithEOF*(s: LPStream): Future[void] {.async: (raises: []), public.} =
except LPStreamEOFError:
trace "Expected EOF came", s
except LPStreamError as exc:
debug "Unexpected error while waiting for EOF", s, msg = exc.msg
debug "Unexpected error while waiting for EOF", s, description = exc.msg

View File

@@ -219,7 +219,7 @@ proc upgradeMonitor(
libp2p_failed_upgrades_incoming.inc()
if not isNil(conn):
await conn.close()
trace "Exception awaiting connection upgrade", exc = exc.msg, conn
trace "Exception awaiting connection upgrade", description = exc.msg, conn
finally:
upgrades.release()
@@ -264,7 +264,7 @@ proc accept(s: Switch, transport: Transport) {.async.} = # noraises
upgrades.release() # always release the slot
return
except CatchableError as exc:
error "Exception in accept loop, exiting", exc = exc.msg
error "Exception in accept loop, exiting", description = exc.msg
upgrades.release() # always release the slot
if not isNil(conn):
await conn.close()
@@ -282,7 +282,7 @@ proc stop*(s: Switch) {.async, public.} =
# Stop accepting incoming connections
await allFutures(s.acceptFuts.mapIt(it.cancelAndWait())).wait(1.seconds)
except CatchableError as exc:
debug "Cannot cancel accepts", error = exc.msg
debug "Cannot cancel accepts", description = exc.msg
for service in s.services:
discard await service.stop(s)
@@ -296,7 +296,7 @@ proc stop*(s: Switch) {.async, public.} =
except CancelledError as exc:
raise exc
except CatchableError as exc:
warn "error cleaning up transports", msg = exc.msg
warn "error cleaning up transports", description = exc.msg
await s.ms.stop()

View File

@@ -0,0 +1,224 @@
import std/sequtils
import pkg/chronos
import pkg/chronicles
import pkg/quic
import ../multiaddress
import ../multicodec
import ../stream/connection
import ../wire
import ../muxers/muxer
import ../upgrademngrs/upgrade
import ./transport
export multiaddress
export multicodec
export connection
export transport
logScope:
topics = "libp2p quictransport"
type
P2PConnection = connection.Connection
QuicConnection = quic.Connection
# Stream
type QuicStream* = ref object of P2PConnection
stream: Stream
cached: seq[byte]
proc new(
_: type QuicStream, stream: Stream, oaddr: Opt[MultiAddress], peerId: PeerId
): QuicStream =
let quicstream = QuicStream(stream: stream, observedAddr: oaddr, peerId: peerId)
procCall P2PConnection(quicstream).initStream()
quicstream
template mapExceptions(body: untyped) =
try:
body
except QuicError:
raise newLPStreamEOFError()
except CatchableError:
raise newLPStreamEOFError()
method readOnce*(
stream: QuicStream, pbytes: pointer, nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
try:
if stream.cached.len == 0:
stream.cached = await stream.stream.read()
result = min(nbytes, stream.cached.len)
copyMem(pbytes, addr stream.cached[0], result)
stream.cached = stream.cached[result ..^ 1]
except CatchableError as exc:
raise newLPStreamEOFError()
{.push warning[LockLevel]: off.}
method write*(
stream: QuicStream, bytes: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
mapExceptions(await stream.stream.write(bytes))
{.pop.}
method closeImpl*(stream: QuicStream) {.async: (raises: []).} =
try:
await stream.stream.close()
except CatchableError as exc:
discard
await procCall P2PConnection(stream).closeImpl()
# Session
type QuicSession* = ref object of P2PConnection
connection: QuicConnection
method close*(session: QuicSession) {.async, base.} =
await session.connection.close()
await procCall P2PConnection(session).close()
proc getStream*(
session: QuicSession, direction = Direction.In
): Future[QuicStream] {.async.} =
var stream: Stream
case direction
of Direction.In:
stream = await session.connection.incomingStream()
of Direction.Out:
stream = await session.connection.openStream()
await stream.write(@[]) # QUIC streams do not exist until data is sent
return QuicStream.new(stream, session.observedAddr, session.peerId)
method getWrapped*(self: QuicSession): P2PConnection =
nil
# Muxer
type QuicMuxer = ref object of Muxer
quicSession: QuicSession
handleFut: Future[void]
method newStream*(
m: QuicMuxer, name: string = "", lazy: bool = false
): Future[P2PConnection] {.
async: (raises: [CancelledError, LPStreamError, MuxerError])
.} =
try:
return await m.quicSession.getStream(Direction.Out)
except CatchableError as exc:
raise newException(MuxerError, exc.msg, exc)
proc handleStream(m: QuicMuxer, chann: QuicStream) {.async.} =
## call the muxer stream handler for this channel
##
try:
await m.streamHandler(chann)
trace "finished handling stream"
doAssert(chann.closed, "connection not closed by handler!")
except CatchableError as exc:
trace "Exception in mplex stream handler", msg = exc.msg
await chann.close()
method handle*(m: QuicMuxer): Future[void] {.async: (raises: []).} =
try:
while not m.quicSession.atEof:
let incomingStream = await m.quicSession.getStream(Direction.In)
asyncSpawn m.handleStream(incomingStream)
except CatchableError as exc:
trace "Exception in mplex handler", msg = exc.msg
method close*(m: QuicMuxer) {.async: (raises: []).} =
try:
await m.quicSession.close()
m.handleFut.cancel()
except CatchableError as exc:
discard
# Transport
type QuicUpgrade = ref object of Upgrade
type QuicTransport* = ref object of Transport
listener: Listener
connections: seq[P2PConnection]
func new*(_: type QuicTransport, u: Upgrade): QuicTransport =
QuicTransport(upgrader: QuicUpgrade(ms: u.ms))
method handles*(transport: QuicTransport, address: MultiAddress): bool =
if not procCall Transport(transport).handles(address):
return false
QUIC_V1.match(address)
method start*(transport: QuicTransport, addrs: seq[MultiAddress]) {.async.} =
doAssert transport.listener.isNil, "start() already called"
#TODO handle multiple addr
transport.listener = listen(initTAddress(addrs[0]).tryGet)
await procCall Transport(transport).start(addrs)
transport.addrs[0] =
MultiAddress.init(transport.listener.localAddress(), IPPROTO_UDP).tryGet() &
MultiAddress.init("/quic-v1").get()
transport.running = true
method stop*(transport: QuicTransport) {.async.} =
if transport.running:
for c in transport.connections:
await c.close()
await procCall Transport(transport).stop()
await transport.listener.stop()
transport.running = false
transport.listener = nil
proc wrapConnection(
transport: QuicTransport, connection: QuicConnection
): P2PConnection {.raises: [Defect, TransportOsError, LPError].} =
let
remoteAddr = connection.remoteAddress()
observedAddr =
MultiAddress.init(remoteAddr, IPPROTO_UDP).get() &
MultiAddress.init("/quic-v1").get()
conres = QuicSession(connection: connection, observedAddr: Opt.some(observedAddr))
conres.initStream()
transport.connections.add(conres)
proc onClose() {.async.} =
await conres.join()
transport.connections.keepItIf(it != conres)
trace "Cleaned up client"
asyncSpawn onClose()
return conres
method accept*(transport: QuicTransport): Future[P2PConnection] {.async.} =
doAssert not transport.listener.isNil, "call start() before calling accept()"
let connection = await transport.listener.accept()
return transport.wrapConnection(connection)
method dial*(
transport: QuicTransport,
hostname: string,
address: MultiAddress,
peerId: Opt[PeerId] = Opt.none(PeerId),
): Future[P2PConnection] {.async, gcsafe.} =
let connection = await dial(initTAddress(address).tryGet)
return transport.wrapConnection(connection)
method upgrade*(
self: QuicTransport, conn: P2PConnection, peerId: Opt[PeerId]
): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} =
let qs = QuicSession(conn)
if peerId.isSome:
qs.peerId = peerId.get()
let muxer = QuicMuxer(quicSession: qs, connection: conn)
muxer.streamHandler = proc(conn: P2PConnection) {.async: (raises: []).} =
trace "Starting stream handler"
try:
await self.upgrader.ms.handle(conn) # handle incoming connection
except CancelledError as exc:
return
except CatchableError as exc:
trace "exception in stream handler", conn, msg = exc.msg
finally:
await conn.closeWithEOF()
trace "Stream handler done", conn
muxer.handleFut = muxer.handle()
return muxer

View File

@@ -226,10 +226,17 @@ method accept*(self: TcpTransport): Future[Connection] =
proc impl(
self: TcpTransport
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
proc cancelAcceptFuts() =
for fut in self.acceptFuts:
if not fut.completed():
fut.cancel()
if not self.running:
raise newTransportClosedError()
if self.acceptFuts.len <= 0:
if self.servers.len == 0:
raise (ref TcpTransportError)(msg: "No listeners configured")
elif self.acceptFuts.len == 0:
# Holds futures representing ongoing accept calls on multiple servers.
self.acceptFuts = self.servers.mapIt(it.accept())
@@ -239,7 +246,10 @@ method accept*(self: TcpTransport): Future[Connection] =
# Waits for any one of these futures to complete, indicating that a new connection has been accepted on one of the servers.
await one(self.acceptFuts)
except ValueError:
raise (ref TcpTransportError)(msg: "No listeners configured")
raiseAssert "Accept futures should not be empty"
except CancelledError as exc:
cancelAcceptFuts()
raise exc
index = self.acceptFuts.find(finished)
# A new connection has been accepted. The corresponding server should immediately start accepting another connection.
@@ -249,10 +259,10 @@ method accept*(self: TcpTransport): Future[Connection] =
try:
await finished
except TransportTooManyError as exc:
debug "Too many files opened", exc = exc.msg
debug "Too many files opened", description = exc.msg
return nil
except TransportAbortedError as exc:
debug "Connection aborted", exc = exc.msg
debug "Connection aborted", description = exc.msg
return nil
except TransportUseClosedError as exc:
raise newTransportClosedError(exc)
@@ -261,6 +271,7 @@ method accept*(self: TcpTransport): Future[Connection] =
except common.TransportError as exc: # Needed for chronos 4.0.0 support
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
except CancelledError as exc:
cancelAcceptFuts()
raise exc
if not self.running: # Stopped while waiting
@@ -273,7 +284,7 @@ method accept*(self: TcpTransport): Future[Connection] =
except TransportOsError as exc:
# The connection had errors / was closed before `await` returned control
await transp.closeWait()
debug "Cannot read remote address", exc = exc.msg
debug "Cannot read remote address", description = exc.msg
return nil
let observedAddr =

View File

@@ -56,7 +56,7 @@ proc new*(
stream.initStream()
return stream
template mapExceptions(body: untyped) =
template mapExceptions(body: untyped): untyped =
try:
body
except AsyncStreamIncompleteError:
@@ -206,7 +206,7 @@ method stop*(self: WsTransport) {.async.} =
self.httpservers = @[]
trace "Transport stopped"
except CatchableError as exc:
trace "Error shutting down ws transport", exc = exc.msg
trace "Error shutting down ws transport", description = exc.msg
proc connHandler(
self: WsTransport, stream: WSSession, secure: bool, dir: Direction
@@ -223,7 +223,7 @@ proc connHandler(
MultiAddress.init(remoteAddr).tryGet() & codec.tryGet()
except CatchableError as exc:
trace "Failed to create observedAddr", exc = exc.msg
trace "Failed to create observedAddr", description = exc.msg
if not (isNil(stream) and stream.stream.reader.closed):
await stream.close()
raise exc
@@ -271,26 +271,26 @@ method accept*(self: WsTransport): Future[Connection] {.async.} =
await req.stream.closeWait()
raise exc
except WebSocketError as exc:
debug "Websocket Error", exc = exc.msg
debug "Websocket Error", description = exc.msg
except HttpError as exc:
debug "Http Error", exc = exc.msg
debug "Http Error", description = exc.msg
except AsyncStreamError as exc:
debug "AsyncStream Error", exc = exc.msg
debug "AsyncStream Error", description = exc.msg
except TransportTooManyError as exc:
debug "Too many files opened", exc = exc.msg
debug "Too many files opened", description = exc.msg
except TransportAbortedError as exc:
debug "Connection aborted", exc = exc.msg
debug "Connection aborted", description = exc.msg
except AsyncTimeoutError as exc:
debug "Timed out", exc = exc.msg
debug "Timed out", description = exc.msg
except TransportUseClosedError as exc:
debug "Server was closed", exc = exc.msg
debug "Server was closed", description = exc.msg
raise newTransportClosedError(exc)
except CancelledError as exc:
raise exc
except TransportOsError as exc:
debug "OS Error", exc = exc.msg
debug "OS Error", description = exc.msg
except CatchableError as exc:
info "Unexpected error accepting connection", exc = exc.msg
info "Unexpected error accepting connection", description = exc.msg
raise exc
method dial*(

View File

@@ -59,7 +59,8 @@ when defined(libp2p_agents_metrics):
KnownLibP2PAgents* {.strdefine.} = "nim-libp2p"
KnownLibP2PAgentsSeq* = KnownLibP2PAgents.safeToLowerAscii().tryGet().split(",")
template safeConvert*[T: SomeInteger, S: Ordinal](value: S): T =
proc safeConvert*[T: SomeInteger](value: SomeOrdinal): T =
type S = typeof(value)
## Converts `value` from S to `T` iff `value` is guaranteed to be preserved.
when int64(T.low) <= int64(S.low()) and uint64(T.high) >= uint64(S.high):
T(value)

View File

@@ -20,7 +20,7 @@ when defined(windows): import winlean else: import posix
const
RTRANSPMA* = mapOr(TCP, WebSockets, UNIX)
TRANSPMA* = mapOr(RTRANSPMA, UDP)
TRANSPMA* = mapOr(RTRANSPMA, QUIC, UDP)
proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] =
## Initialize ``TransportAddress`` with MultiAddress ``ma``.
@@ -28,7 +28,7 @@ proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] =
## MultiAddress must be wire address, e.g. ``{IP4, IP6, UNIX}/{TCP, UDP}``.
##
if mapOr(TCP_IP, WebSockets_IP, UNIX, UDP_IP).match(ma):
if mapOr(TCP_IP, WebSockets_IP, UNIX, UDP_IP, QUIC_V1_IP).match(ma):
var pbuf: array[2, byte]
let code = (?(?ma[0]).protoCode())
if code == multiCodec("unix"):

View File

@@ -17,10 +17,6 @@ import strutils, os
libp2p_mplex_metrics
--d:
unittestPrintTime
--skipParentCfg
--mm:
refc
# reconsider when there's a version-2-2 branch worth testing with as we might switch to orc
# Only add chronicles param if the
# user didn't specify any

View File

@@ -120,7 +120,7 @@ proc main() {.async.} =
echo &"""{{"rtt_to_holepunched_peer_millis":{delay.millis}}}"""
quit(0)
except CatchableError as e:
error "Unexpected error", msg = e.msg
error "Unexpected error", description = e.msg
discard waitFor(main().withTimeout(4.minutes))
quit(1)

View File

@@ -677,55 +677,76 @@ suite "GossipSub internal":
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.subscribe(topic, handler2)
# Instantiates 30 peers and connects all of them to the previously defined `gossipSub`
for i in 0 ..< 30:
# Define a new connection
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler
# Add the connection to `gossipSub`, to their `gossipSub.gossipsub` and `gossipSub.mesh` tables
gossipSub.grafted(peer, topic)
gossipSub.mesh[topic].incl(peer)
# Peers with no budget should not request messages
block:
# should ignore no budget peer
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
let id = @[0'u8, 1, 2, 3]
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
peer.iHaveBudget = 0
let iwants = gossipSub.handleIHave(peer, @[msg])
check:
iwants.messageIDs.len == 0
block:
# given duplicate ihave should generate only one iwant
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
let id = @[0'u8, 1, 2, 3]
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
let iwants = gossipSub.handleIHave(peer, @[msg])
check:
iwants.messageIDs.len == 1
block:
# given duplicate iwant should generate only one message
# Define a new connection
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
# Add message to `gossipSub`'s message cache
let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message())
peer.sentIHaves[^1].incl(id)
# Build an IHAVE message that contains the same message ID three times
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
# Given the peer has no budget to request messages
peer.iHaveBudget = 0
# When a peer makes an IHAVE request for the a message that `gossipSub` has
let iwants = gossipSub.handleIHave(peer, @[msg])
# Then `gossipSub` should not generate an IWant message for the message,
check:
iwants.messageIDs.len == 0
# Peers with budget should request messages. If ids are repeated, only one request should be generated
block:
# Define a new connection
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
let id = @[0'u8, 1, 2, 3]
# Build an IHAVE message that contains the same message ID three times
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
# Given the budget is not 0 (because it's not been overridden)
# When a peer makes an IHAVE request for the a message that `gossipSub` does not have
let iwants = gossipSub.handleIHave(peer, @[msg])
# Then `gossipSub` should generate an IWant message for the message
check:
iwants.messageIDs.len == 1
# Peers with budget should request messages. If ids are repeated, only one request should be generated
block:
# Define a new connection
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
# Add message to `gossipSub`'s message cache
let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message())
peer.sentIHaves[^1].incl(id)
# Build an IWANT message that contains the same message ID three times
let msg = ControlIWant(messageIDs: @[id, id, id])
# When a peer makes an IWANT request for the a message that `gossipSub` has
let genmsg = gossipSub.handleIWant(peer, @[msg])
# Then `gossipSub` should return the message
check:
genmsg.len == 1
@@ -773,7 +794,7 @@ suite "GossipSub internal":
var sentMessages = initHashSet[seq[byte]]()
for i, size in enumerate([size1, size2]):
let data = newSeqWith[byte](size, i.byte)
let data = newSeqWith(size, i.byte)
sentMessages.incl(data)
let msg =

View File

@@ -1059,7 +1059,7 @@ suite "GossipSub":
# Simulate sending an undecodable message
await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(
newSeqWith[byte](33, 1.byte), isHighPriority = true
newSeqWith(33, 1.byte), isHighPriority = true
)
await sleepAsync(300.millis)
@@ -1069,7 +1069,7 @@ suite "GossipSub":
# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(
newSeqWith[byte](35, 1.byte), isHighPriority = true
newSeqWith(35, 1.byte), isHighPriority = true
)
checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false

View File

@@ -26,10 +26,11 @@ const
SuccessVectors = [
"/ip4/1.2.3.4", "/ip4/0.0.0.0", "/ip6/::1",
"/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21",
"/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21/udp/1234/quic", "/ip6zone/x/ip6/fe80::1",
"/ip6zone/x%y/ip6/fe80::1", "/ip6zone/x%y/ip6/::",
"/ip6zone/x/ip6/fe80::1/udp/1234/quic", "/onion/timaq4ygg2iegci7:1234",
"/onion/timaq4ygg2iegci7:80/http",
"/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21/udp/1234/quic",
"/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21/udp/1234/quic-v1",
"/ip6zone/x/ip6/fe80::1", "/ip6zone/x%y/ip6/fe80::1", "/ip6zone/x%y/ip6/::",
"/ip6zone/x/ip6/fe80::1/udp/1234/quic", "/ip6zone/x/ip6/fe80::1/udp/1234/quic-v1",
"/onion/timaq4ygg2iegci7:1234", "/onion/timaq4ygg2iegci7:80/http",
"/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:1234",
"/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:80/http",
"/udp/0", "/tcp/0", "/sctp/0", "/udp/1234", "/tcp/1234", "/sctp/1234", "/udp/65535",
@@ -57,7 +58,7 @@ const
FailureVectors = [
"", "/", "/ip4", "/ip4/::1", "/ip4/fdpsofodsajfdoisa", "/ip6", "/ip6zone",
"/ip6zone/", "/ip6zone//ip6/fe80::1", "/udp", "/tcp", "/sctp", "/udp/65536",
"/tcp/65536", "/quic/65536", "/onion/9imaq4ygg2iegci7:80",
"/tcp/65536", "/quic/65536", "/quic-v1/65536", "/onion/9imaq4ygg2iegci7:80",
"/onion/aaimaq4ygg2iegci7:80", "/onion/timaq4ygg2iegci7:0",
"/onion/timaq4ygg2iegci7:-1", "/onion/timaq4ygg2iegci7",
"/onion/timaq4ygg2iegci@:666",
@@ -70,8 +71,8 @@ const
"/udp/1234/sctp", "/udp/1234/udt/1234", "/udp/1234/utp/1234",
"/ip4/127.0.0.1/udp/jfodsajfidosajfoidsa", "/ip4/127.0.0.1/udp",
"/ip4/127.0.0.1/tcp/jfodsajfidosajfoidsa", "/ip4/127.0.0.1/tcp",
"/ip4/127.0.0.1/quic/1234", "/ip4/127.0.0.1/ipfs", "/ip4/127.0.0.1/ipfs/tcp",
"/ip4/127.0.0.1/p2p", "/ip4/127.0.0.1/p2p/tcp", "/unix",
"/ip4/127.0.0.1/quic/1234", "/ip4/127.0.0.1/quic-v1/1234", "/ip4/127.0.0.1/ipfs",
"/ip4/127.0.0.1/ipfs/tcp", "/ip4/127.0.0.1/p2p", "/ip4/127.0.0.1/p2p/tcp", "/unix",
]
RustSuccessVectors = [
@@ -160,6 +161,15 @@ const
"/quic",
],
),
PatternVector(
pattern: QUIC_V1,
good: @["/ip4/1.2.3.4/udp/1234/quic-v1", "/ip6/::/udp/1234/quic-v1"],
bad:
@[
"/ip4/0.0.0.0/tcp/12345/quic-v1", "/ip6/fc00::/ip4/0.0.0.0/udp/1234/quic-v1",
"/quic-v1",
],
),
PatternVector(
pattern: IPFS,
good:

24
tests/testquic.nim Normal file
View File

@@ -0,0 +1,24 @@
{.used.}
import sequtils
import chronos, stew/byteutils
import
../libp2p/[
stream/connection,
transports/transport,
transports/quictransport,
upgrademngrs/upgrade,
multiaddress,
errors,
wire,
]
import ./helpers, ./commontransport
suite "Quic transport":
asyncTest "can handle local address":
let ma = @[MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet()]
let transport1 = QuicTransport.new()
await transport1.start(ma)
check transport1.handles(transport1.addrs[0])
await transport1.stop()

View File

@@ -126,7 +126,7 @@ suite "RendezVous":
asyncTest "Various local error":
let
rdv = RendezVous.new()
rdv = RendezVous.new(minDuration = 1.minutes, maxDuration = 72.hours)
switch = createSwitch(rdv)
expect RendezVousError:
discard await rdv.request("A".repeat(300))
@@ -137,6 +137,14 @@ suite "RendezVous":
expect RendezVousError:
await rdv.advertise("A".repeat(300))
expect RendezVousError:
await rdv.advertise("A", 2.weeks)
await rdv.advertise("A", 73.hours)
expect RendezVousError:
await rdv.advertise("A", 5.minutes)
await rdv.advertise("A", 30.seconds)
test "Various config error":
expect RendezVousError:
discard RendezVous.new(minDuration = 30.seconds)
expect RendezVousError:
discard RendezVous.new(maxDuration = 73.hours)
expect RendezVousError:
discard RendezVous.new(minDuration = 15.minutes, maxDuration = 10.minutes)

View File

@@ -34,6 +34,7 @@ import
utils/semaphore,
transports/tcptransport,
transports/wstransport,
transports/quictransport,
]
import ./helpers
@@ -841,6 +842,8 @@ suite "Switch":
switch1.peerStore[AddressBook][switch2.peerInfo.peerId] == switch2.peerInfo.addrs
switch1.peerStore[ProtoBook][switch2.peerInfo.peerId] == switch2.peerInfo.protocols
switch1.peerStore[LastSeenBook][switch2.peerInfo.peerId].isSome()
switch1.peerInfo.peerId notin switch2.peerStore[AddressBook]
switch1.peerInfo.peerId notin switch2.peerStore[ProtoBook]
@@ -988,6 +991,42 @@ suite "Switch":
await srcWsSwitch.stop()
await srcTcpSwitch.stop()
asyncTest "e2e quic transport":
let
quicAddress1 = MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet()
quicAddress2 = MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet()
srcSwitch = SwitchBuilder
.new()
.withAddress(quicAddress1)
.withRng(crypto.newRng())
.withTransport(
proc(upgr: Upgrade): Transport =
QuicTransport.new(upgr)
)
.withNoise()
.build()
destSwitch = SwitchBuilder
.new()
.withAddress(quicAddress2)
.withRng(crypto.newRng())
.withTransport(
proc(upgr: Upgrade): Transport =
QuicTransport.new(upgr)
)
.withNoise()
.build()
await destSwitch.start()
await srcSwitch.start()
await srcSwitch.connect(destSwitch.peerInfo.peerId, destSwitch.peerInfo.addrs)
check srcSwitch.isConnected(destSwitch.peerInfo.peerId)
await destSwitch.stop()
await srcSwitch.stop()
asyncTest "mount unstarted protocol":
proc handle(conn: Connection, proto: string) {.async.} =
check "test123" == string.fromBytes(await conn.readLp(1024))

View File

@@ -17,46 +17,42 @@ import ../libp2p/utility
suite "Utility":
test "successful safeConvert from int8 to int16":
let res = safeConvert[int16, int8]((-128).int8)
let res = safeConvert[int16]((-128).int8)
check res == -128'i16
test "unsuccessful safeConvert from int16 to int8":
check not (compiles do:
result: int8 = safeConvert[int8, int16](32767'i16))
result: int8 = safeConvert[int8](32767'i16))
test "successful safeConvert from uint8 to uint16":
let res: uint16 = safeConvert[uint16, uint8](255'u8)
let res: uint16 = safeConvert[uint16](255'u8)
check res == 255'u16
test "unsuccessful safeConvert from uint16 to uint8":
check not (compiles do:
result: uint8 = safeConvert[uint8, uint16](256'u16))
test "successful safeConvert from char to int":
let res: int = safeConvert[int, char]('A')
check res == 65
result: uint8 = safeConvert[uint8](256'u16))
test "unsuccessful safeConvert from int to char":
check not (compiles do:
result: char = safeConvert[char, int](128))
result: char = safeConvert[char](128))
test "successful safeConvert from bool to int":
let res: int = safeConvert[int, bool](true)
let res: int = safeConvert[int](true)
check res == 1
test "unsuccessful safeConvert from int to bool":
check not (compiles do:
result: bool = safeConvert[bool, int](2))
result: bool = safeConvert[bool](2))
test "successful safeConvert from enum to int":
type Color = enum red, green, blue
let res: int = safeConvert[int, Color](green)
let res: int = safeConvert[int](green)
check res == 1
test "unsuccessful safeConvert from int to enum":
type Color = enum red, green, blue
check not (compiles do:
result: Color = safeConvert[Color, int](3))
result: Color = safeConvert[Color](3))
test "successful safeConvert from range to int":
let res: int = safeConvert[int, range[1..10]](5)
@@ -68,11 +64,11 @@ suite "Utility":
test "unsuccessful safeConvert from int to uint":
check not (compiles do:
result: uint = safeConvert[uint, int](11))
result: uint = safeConvert[uint](11))
test "unsuccessful safeConvert from uint to int":
check not (compiles do:
result: uint = safeConvert[int, uint](11.uint))
result: uint = safeConvert[int](11.uint))
suite "withValue and valueOr templates":
type