mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 09:38:10 -05:00
Compare commits
19 Commits
webrtc-dir
...
stg
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
05446734e3 | ||
|
|
64ef502c87 | ||
|
|
1fa30f07e8 | ||
|
|
39d0451a10 | ||
|
|
4dc7a89f45 | ||
|
|
fd26f93b80 | ||
|
|
dd2c74d413 | ||
|
|
b7e0df127f | ||
|
|
f591e692fc | ||
|
|
8855bce085 | ||
|
|
ed5670408b | ||
|
|
97192a3c80 | ||
|
|
294d06323c | ||
|
|
b2a75fc25e | ||
|
|
4b691b6374 | ||
|
|
8bb6215d8a | ||
|
|
f1b78f6be6 | ||
|
|
8377eb0362 | ||
|
|
35d1876ad8 |
8
.github/actions/install_nim/action.yml
vendored
8
.github/actions/install_nim/action.yml
vendored
@@ -6,7 +6,7 @@ inputs:
|
||||
cpu:
|
||||
description: "CPU to build for"
|
||||
default: "amd64"
|
||||
nim_branch:
|
||||
nim_ref:
|
||||
description: "Nim version"
|
||||
default: "version-1-6"
|
||||
shell:
|
||||
@@ -88,6 +88,8 @@ runs:
|
||||
run: |
|
||||
if [[ '${{ inputs.cpu }}' == 'amd64' ]]; then
|
||||
PLATFORM=x64
|
||||
elif [[ '${{ inputs.cpu }}' == 'arm64' ]]; then
|
||||
PLATFORM=arm64
|
||||
else
|
||||
PLATFORM=x86
|
||||
fi
|
||||
@@ -117,7 +119,7 @@ runs:
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: '${{ github.workspace }}/nim'
|
||||
key: ${{ inputs.os }}-${{ inputs.cpu }}-nim-${{ inputs.nim_branch }}-cache-${{ env.cache_nonce }}
|
||||
key: ${{ inputs.os }}-${{ inputs.cpu }}-nim-${{ inputs.nim_ref }}-cache-${{ env.cache_nonce }}
|
||||
|
||||
- name: Build Nim and Nimble
|
||||
shell: ${{ inputs.shell }}
|
||||
@@ -126,6 +128,6 @@ runs:
|
||||
# We don't want partial matches of the cache restored
|
||||
rm -rf nim
|
||||
curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh
|
||||
env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} NIM_COMMIT=${{ inputs.nim_branch }} \
|
||||
env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} NIM_COMMIT=${{ inputs.nim_ref }} \
|
||||
QUICK_AND_DIRTY_COMPILER=1 QUICK_AND_DIRTY_NIMBLE=1 CC=gcc \
|
||||
bash build_nim.sh nim csources dist/nimble NimBinaries
|
||||
|
||||
12
.github/scripts/colors.sh
vendored
Normal file
12
.github/scripts/colors.sh
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# Colors
|
||||
export YLW='\033[1;33m'
|
||||
export RED='\033[0;31m'
|
||||
export GRN='\033[0;32m'
|
||||
export BLU='\033[0;34m'
|
||||
export BLD='\033[1m'
|
||||
export RST='\033[0m'
|
||||
|
||||
# Clear line
|
||||
export CLR='\033[2K'
|
||||
4
.github/scripts/commit_check.sh
vendored
Executable file
4
.github/scripts/commit_check.sh
vendored
Executable file
@@ -0,0 +1,4 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
source .github/scripts/parse_commits.sh
|
||||
parse_commits "$@"
|
||||
30
.github/scripts/parse_commits.sh
vendored
Normal file
30
.github/scripts/parse_commits.sh
vendored
Normal file
@@ -0,0 +1,30 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# The output of this script is as follows:
|
||||
# 1. One line "checking commits between: <start_commit> <end_commit>"
|
||||
# 2. One line for each commit message that is not well-formed
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
source .github/scripts/colors.sh
|
||||
|
||||
parse_commits() {
|
||||
|
||||
BASE_BRANCH=${BASE_BRANCH:-master}
|
||||
|
||||
start_commit=${1:-origin/${BASE_BRANCH}}
|
||||
end_commit=${2:-HEAD}
|
||||
exit_code=0
|
||||
|
||||
echo -e "${GRN}Checking commits between:${RST} $start_commit $end_commit"
|
||||
# Run the loop in the current shell using process substitution
|
||||
while IFS= read -r message || [ -n "$message" ]; do
|
||||
# Check if commit message follows conventional commits format
|
||||
if [[ ! $message =~ ^(build|chore|ci|docs|feat|fix|perf|refactor|revert|style|test)(\(.*\))?:.*$ ]]; then
|
||||
echo -e "${YLW}Commit message is ill-formed:${RST} $message"
|
||||
exit_code=1
|
||||
fi
|
||||
done < <(git log --format=%s "$start_commit".."$end_commit")
|
||||
|
||||
exit ${exit_code}
|
||||
}
|
||||
12
.github/workflows/auto_assign_pr.yml
vendored
Normal file
12
.github/workflows/auto_assign_pr.yml
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
name: Auto Assign PR to Creator
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types:
|
||||
- opened
|
||||
|
||||
jobs:
|
||||
assign_creator:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: toshimaru/auto-author-assign@v1.6.2
|
||||
25
.github/workflows/ci.yml
vendored
25
.github/workflows/ci.yml
vendored
@@ -27,12 +27,14 @@ jobs:
|
||||
cpu: amd64
|
||||
- os: macos
|
||||
cpu: amd64
|
||||
- os: macos-14
|
||||
cpu: arm64
|
||||
- os: windows
|
||||
cpu: amd64
|
||||
nim:
|
||||
- branch: version-1-6
|
||||
nim:
|
||||
- ref: version-1-6
|
||||
memory_management: refc
|
||||
- branch: version-2-0
|
||||
- ref: version-2-0
|
||||
memory_management: refc
|
||||
include:
|
||||
- platform:
|
||||
@@ -47,6 +49,10 @@ jobs:
|
||||
os: macos
|
||||
builder: macos-13
|
||||
shell: bash
|
||||
- platform:
|
||||
os: macos-14
|
||||
builder: macos-14
|
||||
shell: bash
|
||||
- platform:
|
||||
os: windows
|
||||
builder: windows-2022
|
||||
@@ -56,7 +62,7 @@ jobs:
|
||||
run:
|
||||
shell: ${{ matrix.shell }}
|
||||
|
||||
name: '${{ matrix.platform.os }}-${{ matrix.platform.cpu }} (Nim ${{ matrix.nim.branch }})'
|
||||
name: '${{ matrix.platform.os }}-${{ matrix.platform.cpu }} (Nim ${{ matrix.nim.ref }})'
|
||||
runs-on: ${{ matrix.builder }}
|
||||
steps:
|
||||
- name: Checkout
|
||||
@@ -70,12 +76,12 @@ jobs:
|
||||
os: ${{ matrix.platform.os }}
|
||||
cpu: ${{ matrix.platform.cpu }}
|
||||
shell: ${{ matrix.shell }}
|
||||
nim_branch: ${{ matrix.nim.branch }}
|
||||
nim_ref: ${{ matrix.nim.ref }}
|
||||
|
||||
- name: Setup Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: '~1.15.5'
|
||||
go-version: '~1.16.0' # That's the minimum Go version that works with arm.
|
||||
|
||||
- name: Install p2pd
|
||||
run: |
|
||||
@@ -86,9 +92,9 @@ jobs:
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: nimbledeps
|
||||
# Using nim.branch as a simple way to differentiate between nimble using the "pkgs" or "pkgs2" directories.
|
||||
# The change happened on Nimble v0.14.0.
|
||||
key: nimbledeps-${{ matrix.nim.branch }}-${{ hashFiles('.pinned') }} # hashFiles returns a different value on windows
|
||||
# Using nim.ref as a simple way to differentiate between nimble using the "pkgs" or "pkgs2" directories.
|
||||
# The change happened on Nimble v0.14.0. Also forcing the deps to be reinstalled on each os and cpu.
|
||||
key: nimbledeps-${{ matrix.nim.ref }}-${{ matrix.builder }}-${{ matrix.platform.cpu }}-${{ hashFiles('.pinned') }} # hashFiles returns a different value on windows
|
||||
|
||||
- name: Install deps
|
||||
if: ${{ steps.deps-cache.outputs.cache-hit != 'true' }}
|
||||
@@ -109,5 +115,6 @@ jobs:
|
||||
nim --version
|
||||
nimble --version
|
||||
gcc --version
|
||||
|
||||
NIMFLAGS="${NIMFLAGS} --mm:${{ matrix.nim.memory_management }}"
|
||||
nimble test
|
||||
|
||||
77
.github/workflows/commit_lint.yml
vendored
Normal file
77
.github/workflows/commit_lint.yml
vendored
Normal file
@@ -0,0 +1,77 @@
|
||||
name: "Conventional Commits"
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types:
|
||||
- opened
|
||||
- edited
|
||||
- reopened
|
||||
- synchronize
|
||||
jobs:
|
||||
main:
|
||||
name: Validate commit messages
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
pull-requests: write
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
ref: ${{github.event.pull_request.head.ref}}
|
||||
repository: ${{github.event.pull_request.head.repo.full_name}}
|
||||
|
||||
- name: Check commit message
|
||||
id: check_commit_message
|
||||
if: always()
|
||||
run: |
|
||||
set +e
|
||||
|
||||
base_sha=${{ github.event.pull_request.base.sha }}
|
||||
head_sha=${{ github.event.pull_request.head.sha }}
|
||||
|
||||
output=$(.github/scripts/commit_check.sh "${base_sha}" "${head_sha}" 2>&1)
|
||||
exit_code=$?
|
||||
|
||||
echo "${output}" | sed '$d'
|
||||
echo "exit_code=${exit_code}" >> $GITHUB_OUTPUT
|
||||
|
||||
invalid_commit_messages=$(echo "${output}" | sed '1d;$d')
|
||||
invalid_commit_messages=$(echo "${output}" | sed '1d;$d')
|
||||
invalid_commit_messages=$(echo "${invalid_commit_messages}" | sed 's/\x1b\[[0-9;]*m//g') # Remove color codes
|
||||
invalid_commit_messages=$(echo "${invalid_commit_messages}" | sed 's/^Commit message is ill-formed: //') # Remove prefix
|
||||
|
||||
if [[ $exit_code -ne 0 ]]; then
|
||||
EOF=$(dd if=/dev/urandom bs=15 count=1 status=none | base64)
|
||||
echo "error_message<<$EOF" >> "$GITHUB_ENV"
|
||||
echo "${invalid_commit_messages}" >> "$GITHUB_ENV"
|
||||
echo "$EOF" >> "$GITHUB_ENV"
|
||||
fi
|
||||
|
||||
- name: "Publish failed commit messages"
|
||||
uses: marocchino/sticky-pull-request-comment@v2
|
||||
# When the previous steps fails, the workflow would stop. By adding this
|
||||
# condition you can continue the execution with the populated error message.
|
||||
if: always() && (steps.check_commit_message.outputs.exit_code != 0)
|
||||
with:
|
||||
header: commit-message-lint-error
|
||||
message: |
|
||||
Commits must follow the [Conventional Commits specification](https://www.conventionalcommits.org/en/v1.0.0/)
|
||||
Please fix these commit messages:
|
||||
```
|
||||
${{ env.error_message }}
|
||||
```
|
||||
|
||||
# Delete a previous comment when the issue has been resolved
|
||||
- name: "Delete previous comment"
|
||||
if: ${{ steps.check_commit_message.outputs.exit_code == 0 }}
|
||||
uses: marocchino/sticky-pull-request-comment@v2
|
||||
with:
|
||||
header: commit-message-lint-error
|
||||
delete: true
|
||||
|
||||
- name: "Mark as failed"
|
||||
if: steps.check_commit_message.outputs.exit_code != 0
|
||||
uses: actions/github-script@v7
|
||||
with:
|
||||
script: |
|
||||
core.setFailed("Some commit messages are ill-formed")
|
||||
2
.github/workflows/daily_amd64.yml
vendored
2
.github/workflows/daily_amd64.yml
vendored
@@ -10,5 +10,5 @@ jobs:
|
||||
name: Daily amd64
|
||||
uses: ./.github/workflows/daily_common.yml
|
||||
with:
|
||||
nim: "[{'branch': 'version-1-6', 'memory_management': 'refc'}, {'branch': 'version-2-0', 'memory_management': 'refc'}]"
|
||||
nim: "[{'ref': 'version-1-6', 'memory_management': 'refc'}, {'ref': 'version-2-0', 'memory_management': 'refc'}]"
|
||||
cpu: "['amd64']"
|
||||
|
||||
8
.github/workflows/daily_common.yml
vendored
8
.github/workflows/daily_common.yml
vendored
@@ -7,7 +7,7 @@ on:
|
||||
nim:
|
||||
description: 'Nim Configuration'
|
||||
required: true
|
||||
type: string # Following this format: [{"branch": ..., "memory_management": ...}, ...]
|
||||
type: string # Following this format: [{"ref": ..., "memory_management": ...}, ...]
|
||||
cpu:
|
||||
description: 'CPU'
|
||||
required: true
|
||||
@@ -58,7 +58,7 @@ jobs:
|
||||
run:
|
||||
shell: ${{ matrix.platform.shell }}
|
||||
|
||||
name: '${{ matrix.platform.os }}-${{ matrix.cpu }} (Nim ${{ matrix.nim.branch }})'
|
||||
name: '${{ matrix.platform.os }}-${{ matrix.cpu }} (Nim ${{ matrix.nim.ref }})'
|
||||
runs-on: ${{ matrix.platform.builder }}
|
||||
steps:
|
||||
- name: Checkout
|
||||
@@ -69,13 +69,13 @@ jobs:
|
||||
with:
|
||||
os: ${{ matrix.platform.os }}
|
||||
shell: ${{ matrix.platform.shell }}
|
||||
nim_branch: ${{ matrix.nim.branch }}
|
||||
nim_ref: ${{ matrix.nim.ref }}
|
||||
cpu: ${{ matrix.cpu }}
|
||||
|
||||
- name: Setup Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: '~1.15.5'
|
||||
go-version: '~1.16.0'
|
||||
cache: false
|
||||
|
||||
- name: Install p2pd
|
||||
|
||||
2
.github/workflows/daily_devel.yml
vendored
2
.github/workflows/daily_devel.yml
vendored
@@ -10,5 +10,5 @@ jobs:
|
||||
name: Daily Nim Devel
|
||||
uses: ./.github/workflows/daily_common.yml
|
||||
with:
|
||||
nim: "[{'branch': 'devel', 'memory_management': 'orc'}]"
|
||||
nim: "[{'ref': 'devel', 'memory_management': 'orc'}]"
|
||||
cpu: "['amd64']"
|
||||
|
||||
2
.github/workflows/daily_i386.yml
vendored
2
.github/workflows/daily_i386.yml
vendored
@@ -10,6 +10,6 @@ jobs:
|
||||
name: Daily i386 (Linux)
|
||||
uses: ./.github/workflows/daily_common.yml
|
||||
with:
|
||||
nim: "[{'branch': 'version-1-6', 'memory_management': 'refc'}, {'branch': 'version-2-0', 'memory_management': 'refc'}, {'branch': 'devel', 'memory_management': 'orc'}]"
|
||||
nim: "[{'ref': 'version-1-6', 'memory_management': 'refc'}, {'ref': 'version-2-0', 'memory_management': 'refc'}, {'ref': 'devel', 'memory_management': 'orc'}]"
|
||||
cpu: "['i386']"
|
||||
exclude: "[{'platform': {'os':'macos'}}, {'platform': {'os':'windows'}}]"
|
||||
|
||||
2
.github/workflows/daily_sat.yml
vendored
2
.github/workflows/daily_sat.yml
vendored
@@ -10,6 +10,6 @@ jobs:
|
||||
name: Daily SAT
|
||||
uses: ./.github/workflows/daily_common.yml
|
||||
with:
|
||||
nim: "[{'branch': 'version-2-0', 'memory_management': 'refc'}]"
|
||||
nim: "[{'ref': 'version-2-0', 'memory_management': 'refc'}]"
|
||||
cpu: "['amd64']"
|
||||
use_sat_solver: true
|
||||
|
||||
35
.github/workflows/pr_lint.yml
vendored
Normal file
35
.github/workflows/pr_lint.yml
vendored
Normal file
@@ -0,0 +1,35 @@
|
||||
name: "Conventional Commits"
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types:
|
||||
- opened
|
||||
- edited
|
||||
- reopened
|
||||
- synchronize
|
||||
jobs:
|
||||
main:
|
||||
name: Validate PR title
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
pull-requests: write
|
||||
steps:
|
||||
- uses: amannn/action-semantic-pull-request@v5
|
||||
id: lint_pr_title
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
- uses: marocchino/sticky-pull-request-comment@v2
|
||||
# When the previous steps fails, the workflow would stop. By adding this
|
||||
# condition you can continue the execution with the populated error message.
|
||||
if: always() && (steps.lint_pr_title.outputs.error_message != null)
|
||||
with:
|
||||
header: pr-title-lint-error
|
||||
message: |
|
||||
Pull requests titles must follow the [Conventional Commits specification](https://www.conventionalcommits.org/en/v1.0.0/)
|
||||
|
||||
# Delete a previous comment when the issue has been resolved
|
||||
- if: ${{ steps.lint_pr_title.outputs.error_message == null }}
|
||||
uses: marocchino/sticky-pull-request-comment@v2
|
||||
with:
|
||||
header: pr-title-lint-error
|
||||
delete: true
|
||||
@@ -13,7 +13,7 @@ For more information about the go daemon, check out [this repository](https://gi
|
||||
> **Required only** for running the tests.
|
||||
|
||||
# Prerequisites
|
||||
Go with version `1.15.15`.
|
||||
Go with version `1.16.0`.
|
||||
> You will *likely* be able to build `go-libp2p-daemon` with different Go versions, but **they haven't been tested**.
|
||||
|
||||
# Installation
|
||||
@@ -21,7 +21,7 @@ Follow one of the methods below:
|
||||
|
||||
## Script
|
||||
Run the build script while having the `go` command pointing to the correct Go version.
|
||||
We recommend using `1.15.15`, as previously stated.
|
||||
We recommend using `1.16.0`, as previously stated.
|
||||
```sh
|
||||
./scripts/build_p2pd.sh
|
||||
```
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
mode = ScriptMode.Verbose
|
||||
|
||||
packageName = "libp2p"
|
||||
version = "1.6.0"
|
||||
version = "1.7.1"
|
||||
author = "Status Research & Development GmbH"
|
||||
description = "LibP2P implementation"
|
||||
license = "MIT"
|
||||
@@ -125,9 +125,8 @@ task examples_build, "Build the samples":
|
||||
buildSample("tutorial_3_protobuf", true)
|
||||
buildSample("tutorial_4_gossipsub", true)
|
||||
buildSample("tutorial_5_discovery", true)
|
||||
exec "nimble install -y nimpng@#HEAD"
|
||||
# this is to fix broken build on 1.7.3, remove it when nimpng version 0.3.2 or later is released
|
||||
exec "nimble install -y nico@#af99dd60bf2b395038ece815ea1012330a80d6e6"
|
||||
exec "nimble install -y nimpng"
|
||||
exec "nimble install -y nico --passNim=--skipParentCfg"
|
||||
buildSample("tutorial_6_game", false, "--styleCheck:off")
|
||||
|
||||
# pin system
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# Nim-LibP2P
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
@@ -10,7 +10,7 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[streams, strutils, sets, sequtils],
|
||||
std/[streams, sets, sequtils],
|
||||
chronos,
|
||||
chronicles,
|
||||
stew/byteutils,
|
||||
@@ -39,24 +39,32 @@ proc questionToBuf(address: string, kind: QKind): seq[byte] =
|
||||
|
||||
var buf = newSeq[byte](dataLen)
|
||||
discard requestStream.readData(addr buf[0], dataLen)
|
||||
return buf
|
||||
except CatchableError as exc:
|
||||
buf
|
||||
except IOError as exc:
|
||||
info "Failed to created DNS buffer", description = exc.msg
|
||||
return newSeq[byte](0)
|
||||
newSeq[byte](0)
|
||||
except OSError as exc:
|
||||
info "Failed to created DNS buffer", description = exc.msg
|
||||
newSeq[byte](0)
|
||||
except ValueError as exc:
|
||||
info "Failed to created DNS buffer", description = exc.msg
|
||||
newSeq[byte](0)
|
||||
|
||||
proc getDnsResponse(
|
||||
dnsServer: TransportAddress, address: string, kind: QKind
|
||||
): Future[Response] {.async.} =
|
||||
): Future[Response] {.
|
||||
async: (raises: [CancelledError, IOError, OSError, TransportError, ValueError])
|
||||
.} =
|
||||
var sendBuf = questionToBuf(address, kind)
|
||||
|
||||
if sendBuf.len == 0:
|
||||
raise newException(ValueError, "Incorrect DNS query")
|
||||
|
||||
let receivedDataFuture = newFuture[void]()
|
||||
let receivedDataFuture = Future[void].Raising([CancelledError]).init()
|
||||
|
||||
proc datagramDataReceived(
|
||||
transp: DatagramTransport, raddr: TransportAddress
|
||||
): Future[void] {.async, closure.} =
|
||||
): Future[void] {.async: (raises: []), closure.} =
|
||||
receivedDataFuture.complete()
|
||||
|
||||
let sock =
|
||||
@@ -68,27 +76,41 @@ proc getDnsResponse(
|
||||
try:
|
||||
await sock.sendTo(dnsServer, addr sendBuf[0], sendBuf.len)
|
||||
|
||||
await receivedDataFuture or sleepAsync(5.seconds) #unix default
|
||||
|
||||
if not receivedDataFuture.finished:
|
||||
try:
|
||||
await receivedDataFuture.wait(5.seconds) #unix default
|
||||
except AsyncTimeoutError:
|
||||
raise newException(IOError, "DNS server timeout")
|
||||
|
||||
let rawResponse = sock.getMessage()
|
||||
# parseResponse can has a raises: [Exception, ..] because of
|
||||
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
|
||||
# it can't actually raise though
|
||||
return exceptionToAssert:
|
||||
try:
|
||||
parseResponse(string.fromBytes(rawResponse))
|
||||
except IOError as exc:
|
||||
raise exc
|
||||
except OSError as exc:
|
||||
raise exc
|
||||
except ValueError as exc:
|
||||
raise exc
|
||||
except Exception as exc:
|
||||
# Nim 1.6: parseResponse can has a raises: [Exception, ..] because of
|
||||
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
|
||||
# it can't actually raise though
|
||||
raiseAssert exc.msg
|
||||
finally:
|
||||
await sock.closeWait()
|
||||
|
||||
method resolveIp*(
|
||||
self: DnsResolver, address: string, port: Port, domain: Domain = Domain.AF_UNSPEC
|
||||
): Future[seq[TransportAddress]] {.async.} =
|
||||
): Future[seq[TransportAddress]] {.
|
||||
async: (raises: [CancelledError, TransportAddressError])
|
||||
.} =
|
||||
trace "Resolving IP using DNS", address, servers = self.nameServers.mapIt($it), domain
|
||||
for _ in 0 ..< self.nameServers.len:
|
||||
let server = self.nameServers[0]
|
||||
var responseFutures: seq[Future[Response]]
|
||||
var responseFutures: seq[
|
||||
Future[Response].Raising(
|
||||
[CancelledError, IOError, OSError, TransportError, ValueError]
|
||||
)
|
||||
]
|
||||
if domain == Domain.AF_INET or domain == Domain.AF_UNSPEC:
|
||||
responseFutures.add(getDnsResponse(server, address, A))
|
||||
|
||||
@@ -103,23 +125,32 @@ method resolveIp*(
|
||||
var
|
||||
resolvedAddresses: OrderedSet[string]
|
||||
resolveFailed = false
|
||||
template handleFail(e): untyped =
|
||||
info "Failed to query DNS", address, error = e.msg
|
||||
resolveFailed = true
|
||||
break
|
||||
|
||||
for fut in responseFutures:
|
||||
try:
|
||||
let resp = await fut
|
||||
for answer in resp.answers:
|
||||
# toString can has a raises: [Exception, ..] because of
|
||||
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
|
||||
# it can't actually raise though
|
||||
resolvedAddresses.incl(exceptionToAssert(answer.toString()))
|
||||
resolvedAddresses.incl(answer.toString())
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except ValueError as e:
|
||||
info "Invalid DNS query", address, error = e.msg
|
||||
return @[]
|
||||
except CatchableError as e:
|
||||
info "Failed to query DNS", address, error = e.msg
|
||||
resolveFailed = true
|
||||
break
|
||||
except IOError as e:
|
||||
handleFail(e)
|
||||
except OSError as e:
|
||||
handleFail(e)
|
||||
except TransportError as e:
|
||||
handleFail(e)
|
||||
except Exception as e:
|
||||
# Nim 1.6: answer.toString can has a raises: [Exception, ..] because of
|
||||
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
|
||||
# it can't actually raise though
|
||||
raiseAssert e.msg
|
||||
|
||||
if resolveFailed:
|
||||
self.nameServers.add(self.nameServers[0])
|
||||
@@ -132,27 +163,39 @@ method resolveIp*(
|
||||
debug "Failed to resolve address, returning empty set"
|
||||
return @[]
|
||||
|
||||
method resolveTxt*(self: DnsResolver, address: string): Future[seq[string]] {.async.} =
|
||||
method resolveTxt*(
|
||||
self: DnsResolver, address: string
|
||||
): Future[seq[string]] {.async: (raises: [CancelledError]).} =
|
||||
trace "Resolving TXT using DNS", address, servers = self.nameServers.mapIt($it)
|
||||
for _ in 0 ..< self.nameServers.len:
|
||||
let server = self.nameServers[0]
|
||||
try:
|
||||
# toString can has a raises: [Exception, ..] because of
|
||||
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
|
||||
# it can't actually raise though
|
||||
let response = await getDnsResponse(server, address, TXT)
|
||||
return exceptionToAssert:
|
||||
trace "Got TXT response",
|
||||
server = $server, answer = response.answers.mapIt(it.toString())
|
||||
response.answers.mapIt(it.toString())
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError as e:
|
||||
template handleFail(e): untyped =
|
||||
info "Failed to query DNS", address, error = e.msg
|
||||
self.nameServers.add(self.nameServers[0])
|
||||
self.nameServers.delete(0)
|
||||
continue
|
||||
|
||||
try:
|
||||
let response = await getDnsResponse(server, address, TXT)
|
||||
trace "Got TXT response",
|
||||
server = $server, answer = response.answers.mapIt(it.toString())
|
||||
return response.answers.mapIt(it.toString())
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except IOError as e:
|
||||
handleFail(e)
|
||||
except OSError as e:
|
||||
handleFail(e)
|
||||
except TransportError as e:
|
||||
handleFail(e)
|
||||
except ValueError as e:
|
||||
handleFail(e)
|
||||
except Exception as e:
|
||||
# Nim 1.6: toString can has a raises: [Exception, ..] because of
|
||||
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
|
||||
# it can't actually raise though
|
||||
raiseAssert e.msg
|
||||
|
||||
debug "Failed to resolve TXT, returning empty set"
|
||||
return @[]
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# Nim-LibP2P
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
@@ -25,17 +25,25 @@ type MockResolver* = ref object of NameResolver
|
||||
|
||||
method resolveIp*(
|
||||
self: MockResolver, address: string, port: Port, domain: Domain = Domain.AF_UNSPEC
|
||||
): Future[seq[TransportAddress]] {.async.} =
|
||||
): Future[seq[TransportAddress]] {.
|
||||
async: (raises: [CancelledError, TransportAddressError])
|
||||
.} =
|
||||
var res: seq[TransportAddress]
|
||||
|
||||
if domain == Domain.AF_INET or domain == Domain.AF_UNSPEC:
|
||||
for resp in self.ipResponses.getOrDefault((address, false)):
|
||||
result.add(initTAddress(resp, port))
|
||||
res.add(initTAddress(resp, port))
|
||||
|
||||
if domain == Domain.AF_INET6 or domain == Domain.AF_UNSPEC:
|
||||
for resp in self.ipResponses.getOrDefault((address, true)):
|
||||
result.add(initTAddress(resp, port))
|
||||
res.add(initTAddress(resp, port))
|
||||
|
||||
method resolveTxt*(self: MockResolver, address: string): Future[seq[string]] {.async.} =
|
||||
return self.txtResponses.getOrDefault(address)
|
||||
res
|
||||
|
||||
method resolveTxt*(
|
||||
self: MockResolver, address: string
|
||||
): Future[seq[string]] {.async: (raises: [CancelledError]).} =
|
||||
self.txtResponses.getOrDefault(address)
|
||||
|
||||
proc new*(T: typedesc[MockResolver]): T =
|
||||
T()
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# Nim-LibP2P
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
@@ -9,7 +9,7 @@
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[sugar, sets, sequtils, strutils]
|
||||
import std/[sets, sequtils, strutils]
|
||||
import chronos, chronicles, stew/endians2
|
||||
import ".."/[multiaddress, multicodec]
|
||||
|
||||
@@ -20,19 +20,17 @@ type NameResolver* = ref object of RootObj
|
||||
|
||||
method resolveTxt*(
|
||||
self: NameResolver, address: string
|
||||
): Future[seq[string]] {.async, base.} =
|
||||
): Future[seq[string]] {.async: (raises: [CancelledError]), base.} =
|
||||
## Get TXT record
|
||||
##
|
||||
|
||||
doAssert(false, "Not implemented!")
|
||||
raiseAssert "Not implemented!"
|
||||
|
||||
method resolveIp*(
|
||||
self: NameResolver, address: string, port: Port, domain: Domain = Domain.AF_UNSPEC
|
||||
): Future[seq[TransportAddress]] {.async, base.} =
|
||||
): Future[seq[TransportAddress]] {.
|
||||
async: (raises: [CancelledError, TransportAddressError]), base
|
||||
.} =
|
||||
## Resolve the specified address
|
||||
##
|
||||
|
||||
doAssert(false, "Not implemented!")
|
||||
raiseAssert "Not implemented!"
|
||||
|
||||
proc getHostname*(ma: MultiAddress): string =
|
||||
let
|
||||
@@ -46,30 +44,40 @@ proc getHostname*(ma: MultiAddress): string =
|
||||
|
||||
proc resolveOneAddress(
|
||||
self: NameResolver, ma: MultiAddress, domain: Domain = Domain.AF_UNSPEC, prefix = ""
|
||||
): Future[seq[MultiAddress]] {.async.} =
|
||||
#Resolve a single address
|
||||
): Future[seq[MultiAddress]] {.
|
||||
async: (raises: [CancelledError, MaError, TransportAddressError])
|
||||
.} =
|
||||
# Resolve a single address
|
||||
let portPart = ma[1].valueOr:
|
||||
raise maErr error
|
||||
var pbuf: array[2, byte]
|
||||
|
||||
var dnsval = getHostname(ma)
|
||||
|
||||
if ma[1].tryGet().protoArgument(pbuf).tryGet() == 0:
|
||||
raise newException(MaError, "Incorrect port number")
|
||||
let plen = portPart.protoArgument(pbuf).valueOr:
|
||||
raise maErr error
|
||||
if plen == 0:
|
||||
raise maErr "Incorrect port number"
|
||||
let
|
||||
port = Port(fromBytesBE(uint16, pbuf))
|
||||
dnsval = getHostname(ma)
|
||||
resolvedAddresses = await self.resolveIp(prefix & dnsval, port, domain)
|
||||
|
||||
return collect(newSeqOfCap(4)):
|
||||
for address in resolvedAddresses:
|
||||
var createdAddress = MultiAddress.init(address).tryGet()[0].tryGet()
|
||||
for part in ma:
|
||||
if DNS.match(part.tryGet()):
|
||||
continue
|
||||
createdAddress &= part.tryGet()
|
||||
createdAddress
|
||||
resolvedAddresses.mapIt:
|
||||
let address = MultiAddress.init(it).valueOr:
|
||||
raise maErr error
|
||||
var createdAddress = address[0].valueOr:
|
||||
raise maErr error
|
||||
for part in ma:
|
||||
let part = part.valueOr:
|
||||
raise maErr error
|
||||
if DNS.match(part):
|
||||
continue
|
||||
createdAddress &= part
|
||||
createdAddress
|
||||
|
||||
proc resolveDnsAddr*(
|
||||
self: NameResolver, ma: MultiAddress, depth: int = 0
|
||||
): Future[seq[MultiAddress]] {.async.} =
|
||||
): Future[seq[MultiAddress]] {.
|
||||
async: (raises: [CancelledError, MaError, TransportAddressError])
|
||||
.} =
|
||||
if not DNSADDR.matchPartial(ma):
|
||||
return @[ma]
|
||||
|
||||
@@ -78,54 +86,67 @@ proc resolveDnsAddr*(
|
||||
info "Stopping DNSADDR recursion, probably malicious", ma
|
||||
return @[]
|
||||
|
||||
var dnsval = getHostname(ma)
|
||||
|
||||
let txt = await self.resolveTxt("_dnsaddr." & dnsval)
|
||||
let
|
||||
dnsval = getHostname(ma)
|
||||
txt = await self.resolveTxt("_dnsaddr." & dnsval)
|
||||
|
||||
trace "txt entries", txt
|
||||
|
||||
var result: seq[MultiAddress]
|
||||
const codec = multiCodec("p2p")
|
||||
let maCodec = block:
|
||||
let hasCodec = ma.contains(codec).valueOr:
|
||||
raise maErr error
|
||||
if hasCodec:
|
||||
ma[codec]
|
||||
else:
|
||||
(static(default(MaResult[MultiAddress])))
|
||||
|
||||
var res: seq[MultiAddress]
|
||||
for entry in txt:
|
||||
if not entry.startsWith("dnsaddr="):
|
||||
continue
|
||||
let entryValue = MultiAddress.init(entry[8 ..^ 1]).tryGet()
|
||||
|
||||
if entryValue.contains(multiCodec("p2p")).tryGet() and
|
||||
ma.contains(multiCodec("p2p")).tryGet():
|
||||
if entryValue[multiCodec("p2p")] != ma[multiCodec("p2p")]:
|
||||
continue
|
||||
let
|
||||
entryValue = MultiAddress.init(entry[8 ..^ 1]).valueOr:
|
||||
raise maErr error
|
||||
entryHasCodec = entryValue.contains(multiCodec("p2p")).valueOr:
|
||||
raise maErr error
|
||||
if entryHasCodec and maCodec.isOk and entryValue[codec] != maCodec:
|
||||
continue
|
||||
|
||||
let resolved = await self.resolveDnsAddr(entryValue, depth + 1)
|
||||
for r in resolved:
|
||||
result.add(r)
|
||||
res.add(r)
|
||||
|
||||
if result.len == 0:
|
||||
if res.len == 0:
|
||||
debug "Failed to resolve a DNSADDR", ma
|
||||
return @[]
|
||||
return result
|
||||
res
|
||||
|
||||
proc resolveMAddress*(
|
||||
self: NameResolver, address: MultiAddress
|
||||
): Future[seq[MultiAddress]] {.async.} =
|
||||
): Future[seq[MultiAddress]] {.
|
||||
async: (raises: [CancelledError, MaError, TransportAddressError])
|
||||
.} =
|
||||
var res = initOrderedSet[MultiAddress]()
|
||||
|
||||
if not DNS.matchPartial(address):
|
||||
res.incl(address)
|
||||
else:
|
||||
let code = address[0].tryGet().protoCode().tryGet()
|
||||
let seq =
|
||||
case code
|
||||
of multiCodec("dns"):
|
||||
await self.resolveOneAddress(address)
|
||||
of multiCodec("dns4"):
|
||||
await self.resolveOneAddress(address, Domain.AF_INET)
|
||||
of multiCodec("dns6"):
|
||||
await self.resolveOneAddress(address, Domain.AF_INET6)
|
||||
of multiCodec("dnsaddr"):
|
||||
await self.resolveDnsAddr(address)
|
||||
else:
|
||||
assert false
|
||||
@[address]
|
||||
for ad in seq:
|
||||
let
|
||||
firstPart = address[0].valueOr:
|
||||
raise maErr error
|
||||
code = firstPart.protoCode().valueOr:
|
||||
raise maErr error
|
||||
ads =
|
||||
case code
|
||||
of multiCodec("dns"):
|
||||
await self.resolveOneAddress(address)
|
||||
of multiCodec("dns4"):
|
||||
await self.resolveOneAddress(address, Domain.AF_INET)
|
||||
of multiCodec("dns6"):
|
||||
await self.resolveOneAddress(address, Domain.AF_INET6)
|
||||
of multiCodec("dnsaddr"):
|
||||
await self.resolveDnsAddr(address)
|
||||
else:
|
||||
raise maErr("Unsupported codec " & $code)
|
||||
for ad in ads:
|
||||
res.incl(ad)
|
||||
return res.toSeq
|
||||
res.toSeq
|
||||
|
||||
@@ -63,6 +63,7 @@ type
|
||||
KeyBook* {.public.} = ref object of PeerBook[PublicKey]
|
||||
|
||||
AgentBook* {.public.} = ref object of PeerBook[string]
|
||||
LastSeenBook* {.public.} = ref object of PeerBook[Opt[MultiAddress]]
|
||||
ProtoVersionBook* {.public.} = ref object of PeerBook[string]
|
||||
SPRBook* {.public.} = ref object of PeerBook[Envelope]
|
||||
|
||||
@@ -145,10 +146,16 @@ proc del*(peerStore: PeerStore, peerId: PeerId) {.public.} =
|
||||
for _, book in peerStore.books:
|
||||
book.deletor(peerId)
|
||||
|
||||
proc updatePeerInfo*(peerStore: PeerStore, info: IdentifyInfo) =
|
||||
if info.addrs.len > 0:
|
||||
proc updatePeerInfo*(
|
||||
peerStore: PeerStore,
|
||||
info: IdentifyInfo,
|
||||
observedAddr: Opt[MultiAddress] = Opt.none(MultiAddress),
|
||||
) =
|
||||
if len(info.addrs) > 0:
|
||||
peerStore[AddressBook][info.peerId] = info.addrs
|
||||
|
||||
peerStore[LastSeenBook][info.peerId] = observedAddr
|
||||
|
||||
info.pubkey.withValue(pubkey):
|
||||
peerStore[KeyBook][info.peerId] = pubkey
|
||||
|
||||
@@ -200,7 +207,7 @@ proc identify*(peerStore: PeerStore, muxer: Muxer) {.async.} =
|
||||
knownAgent = shortAgent
|
||||
muxer.connection.setShortAgent(knownAgent)
|
||||
|
||||
peerStore.updatePeerInfo(info)
|
||||
peerStore.updatePeerInfo(info, stream.observedAddr)
|
||||
finally:
|
||||
await stream.closeWithEOF()
|
||||
|
||||
|
||||
@@ -36,6 +36,32 @@ import ./gossipsub/[types, scoring, behavior], ../../utils/heartbeat
|
||||
|
||||
export types, scoring, behavior, pubsub
|
||||
|
||||
import std/atomics
|
||||
const WARMUP_THRESHOLD = 2
|
||||
var
|
||||
lma_dup_during_validation: Atomic[uint32] # number of duplicates during 1st message validation
|
||||
lma_idontwant_saves: Atomic[uint32] # number of Txs saved due to idontwant
|
||||
lma_duplicate_count: Atomic[uint32] # number of duplicate messages received
|
||||
lma_iwants_sent: Atomic[uint32] # number of iwant requests sent
|
||||
lma_iwants_replied: Atomic[uint32] # number of iwant messages that are replied
|
||||
lma_imreceiving_saves: Atomic[uint32] # number of messages saved due to imreceiving message
|
||||
lma_unique_receives: Atomic[uint32] # number of unique messages received
|
||||
lma_mesh_recvs_aftar_iwant: Atomic[uint32] # messages received from mesh, after sending iwant request
|
||||
lma_warmup_messages: Atomic[uint32] # dont issue idontwant during if < WARMUP_THRESHOLD
|
||||
|
||||
lma_dup_during_validation.store(0)
|
||||
lma_idontwant_saves.store(0)
|
||||
lma_duplicate_count.store(0)
|
||||
lma_iwants_sent.store(0)
|
||||
lma_iwants_replied.store(0)
|
||||
lma_imreceiving_saves.store(0)
|
||||
lma_unique_receives.store(0)
|
||||
lma_mesh_recvs_aftar_iwant.store(0)
|
||||
lma_warmup_messages.store(0)
|
||||
|
||||
export lma_dup_during_validation, lma_idontwant_saves, lma_duplicate_count, lma_iwants_sent,
|
||||
lma_iwants_replied, lma_imreceiving_saves, lma_unique_receives, lma_mesh_recvs_aftar_iwant
|
||||
|
||||
logScope:
|
||||
topics = "libp2p gossipsub"
|
||||
|
||||
@@ -226,6 +252,7 @@ method init*(g: GossipSub) =
|
||||
g.codecs &= GossipSubCodec_12
|
||||
g.codecs &= GossipSubCodec_11
|
||||
g.codecs &= GossipSubCodec_10
|
||||
g.iwantsRequested = initHashSet[MessageId]()
|
||||
|
||||
method onNewPeer*(g: GossipSub, peer: PubSubPeer) =
|
||||
g.withPeerStats(peer.peerId) do(stats: var PeerStats):
|
||||
@@ -347,6 +374,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||
|
||||
var respControl: ControlMessage
|
||||
g.handleIDontWant(peer, control.idontwant)
|
||||
g.handleIMReceiving(peer, control.imreceiving)
|
||||
let iwant = g.handleIHave(peer, control.ihave)
|
||||
if iwant.messageIDs.len > 0:
|
||||
respControl.iwant.add(iwant)
|
||||
@@ -360,6 +388,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||
if isPruneNotEmpty or isIWantNotEmpty:
|
||||
if isIWantNotEmpty:
|
||||
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
|
||||
lma_iwants_sent.atomicInc(respControl.iwant.len.uint32)
|
||||
|
||||
if isPruneNotEmpty:
|
||||
for prune in respControl.prune:
|
||||
@@ -381,6 +410,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||
|
||||
# iwant replies have lower priority
|
||||
trace "sending iwant reply messages", peer
|
||||
lma_iwants_replied.atomicInc(messages.len.uint32)
|
||||
g.send(peer, RPCMsg(messages: messages), isHighPriority = false)
|
||||
|
||||
proc validateAndRelay(
|
||||
@@ -397,6 +427,8 @@ proc validateAndRelay(
|
||||
toSendPeers.incl(peers[])
|
||||
g.subscribedDirectPeers.withValue(topic, peers):
|
||||
toSendPeers.incl(peers[])
|
||||
if not (peer in toSendPeers):
|
||||
lma_mesh_recvs_aftar_iwant.atomicInc()
|
||||
toSendPeers.excl(peer)
|
||||
|
||||
if msg.data.len > max(512, msgId.len * 10):
|
||||
@@ -409,25 +441,39 @@ proc validateAndRelay(
|
||||
# descored) and that the savings from honest peers are greater than the
|
||||
# cost a dishonest peer can incur in short time (since the IDONTWANT is
|
||||
# small).
|
||||
var peersToSendIDontWant = HashSet[PubSubPeer]()
|
||||
addToSendPeers(peersToSendIDontWant)
|
||||
peersToSendIDontWant.exclIfIt(
|
||||
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
|
||||
)
|
||||
g.broadcast(
|
||||
peersToSendIDontWant,
|
||||
RPCMsg(
|
||||
control:
|
||||
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
|
||||
),
|
||||
isHighPriority = true,
|
||||
)
|
||||
if lma_warmup_messages.load() < WARMUP_THRESHOLD:
|
||||
lma_warmup_messages.atomicInc()
|
||||
if lma_warmup_messages.load() == WARMUP_THRESHOLD:
|
||||
lma_dup_during_validation.store(0)
|
||||
lma_idontwant_saves.store(0)
|
||||
lma_duplicate_count.store(0)
|
||||
lma_iwants_sent.store(0)
|
||||
lma_iwants_replied.store(0)
|
||||
lma_imreceiving_saves.store(0)
|
||||
lma_unique_receives.store(0)
|
||||
lma_mesh_recvs_aftar_iwant.store(0)
|
||||
|
||||
else:
|
||||
var peersToSendIDontWant = HashSet[PubSubPeer]()
|
||||
addToSendPeers(peersToSendIDontWant)
|
||||
peersToSendIDontWant.exclIfIt(
|
||||
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
|
||||
)
|
||||
g.broadcast(
|
||||
peersToSendIDontWant,
|
||||
RPCMsg(
|
||||
control:
|
||||
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
|
||||
),
|
||||
isHighPriority = true,
|
||||
)
|
||||
|
||||
let validation = await g.validate(msg)
|
||||
|
||||
var seenPeers: HashSet[PubSubPeer]
|
||||
discard g.validationSeen.pop(saltedId, seenPeers)
|
||||
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
|
||||
lma_dup_during_validation.atomicInc(seenPeers.len.uint32)
|
||||
libp2p_gossipsub_saved_bytes.inc(
|
||||
(msg.data.len * seenPeers.len).int64, labelValues = ["validation_duplicate"]
|
||||
)
|
||||
@@ -463,6 +509,17 @@ proc validateAndRelay(
|
||||
# Don't send it to peers that sent it during validation
|
||||
toSendPeers.excl(seenPeers)
|
||||
|
||||
#We have received IMReceiving from these peers, We should not exclude them
|
||||
#Ideally we should wait (TxTime + large safety cushion) before sending to these peers
|
||||
var receivingPeers: HashSet[PubSubPeer]
|
||||
for pr in toSendPeers:
|
||||
for heIsReceiving in pr.heIsReceivings:
|
||||
if msgId in heIsReceiving:
|
||||
receivingPeers.incl(pr)
|
||||
break
|
||||
toSendPeers.excl(receivingPeers)
|
||||
lma_imreceiving_saves.atomicInc(receivingPeers.len.uint32)
|
||||
|
||||
proc isMsgInIdontWant(it: PubSubPeer): bool =
|
||||
for iDontWant in it.iDontWants:
|
||||
if saltedId in iDontWant:
|
||||
@@ -470,6 +527,7 @@ proc validateAndRelay(
|
||||
libp2p_gossipsub_saved_bytes.inc(
|
||||
msg.data.len.int64, labelValues = ["idontwant"]
|
||||
)
|
||||
lma_idontwant_saves.atomicInc()
|
||||
return true
|
||||
return false
|
||||
|
||||
@@ -596,11 +654,13 @@ method rpcHandler*(g: GossipSub, peer: PubSubPeer, data: seq[byte]) {.async.} =
|
||||
g.rewardDelivered(peer, topic, false, delay)
|
||||
|
||||
libp2p_gossipsub_duplicate.inc()
|
||||
lma_duplicate_count.atomicInc()
|
||||
|
||||
# onto the next message
|
||||
continue
|
||||
|
||||
libp2p_gossipsub_received.inc()
|
||||
lma_unique_receives.atomicInc()
|
||||
|
||||
# avoid processing messages we are not interested in
|
||||
if topic notin g.topics:
|
||||
|
||||
@@ -290,14 +290,31 @@ proc handleIHave*(
|
||||
for ihave in ihaves:
|
||||
trace "peer sent ihave", peer, topicID = ihave.topicID, msgs = ihave.messageIDs
|
||||
if ihave.topicID in g.topics:
|
||||
#look here for receieved idontwants for the same message
|
||||
var meshPeers: HashSet[PubSubPeer]
|
||||
g.mesh.withValue(ihave.topicID, peers): meshPeers.incl(peers[])
|
||||
g.subscribedDirectPeers.withValue(ihave.topicID, peers): meshPeers.incl(peers[])
|
||||
|
||||
for msgId in ihave.messageIDs:
|
||||
if not g.hasSeen(g.salt(msgId)):
|
||||
if peer.iHaveBudget <= 0:
|
||||
break
|
||||
elif msgId in g.iwantsRequested:
|
||||
break
|
||||
elif msgId notin res.messageIDs:
|
||||
res.messageIDs.add(msgId)
|
||||
dec peer.iHaveBudget
|
||||
trace "requested message via ihave", messageID = msgId
|
||||
#dont send IWANT if we have received (N number of) IDontWant(s) for a msgID
|
||||
let saltedID = g.salt(msgId)
|
||||
var numFinds: int = 0
|
||||
for meshPeer in meshPeers:
|
||||
for heDontWant in meshPeer.iDontWants:
|
||||
if saltedID in heDontWant:
|
||||
numFinds = numFinds + 1
|
||||
#break;
|
||||
if numFinds == 0: #We currently wait for 1 IDontWants
|
||||
res.messageIDs.add(msgId)
|
||||
dec peer.iHaveBudget
|
||||
g.iwantsRequested.incl(msgId)
|
||||
trace "requested message via ihave", messageID = msgId
|
||||
# shuffling res.messageIDs before sending it out to increase the likelihood
|
||||
# of getting an answer if the peer truncates the list due to internal size restrictions.
|
||||
g.rng.shuffle(res.messageIDs)
|
||||
@@ -309,6 +326,35 @@ proc handleIDontWant*(g: GossipSub, peer: PubSubPeer, iDontWants: seq[ControlIWa
|
||||
if peer.iDontWants[^1].len > 1000:
|
||||
break
|
||||
peer.iDontWants[^1].incl(g.salt(messageId))
|
||||
|
||||
#Experimental change for quick performance evaluation only (Ideally for very large messages):
|
||||
#[
|
||||
1) IDontWant is followed by the message. IMReceiving informs peers that we are receiving this message
|
||||
2) Prototype implementation for a single topic ("test"). Need topic ID in IDontWant
|
||||
|
||||
3) Better solution is to send Message detail in a message preamble, That can be used for IMReceiving
|
||||
]#
|
||||
var toSendPeers = HashSet[PubSubPeer]()
|
||||
g.floodsub.withValue("test", peers): toSendPeers.incl(peers[])
|
||||
g.mesh.withValue("test", peers): toSendPeers.incl(peers[])
|
||||
|
||||
# add direct peers
|
||||
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault("test"))
|
||||
|
||||
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
|
||||
imreceiving: @[ControlIWant(messageIDs: @[messageId])]
|
||||
))), isHighPriority = true)
|
||||
|
||||
|
||||
|
||||
proc handleIMReceiving*(g: GossipSub,
|
||||
peer: PubSubPeer,
|
||||
imreceivings: seq[ControlIWant]) =
|
||||
for imreceiving in imreceivings:
|
||||
for messageId in imreceiving.messageIDs:
|
||||
if peer.heIsReceivings[^1].len > 1000: break
|
||||
if messageId.len > 100: continue
|
||||
peer.heIsReceivings[^1].incl(messageId)
|
||||
|
||||
proc handleIWant*(
|
||||
g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]
|
||||
|
||||
@@ -188,6 +188,8 @@ type
|
||||
|
||||
heartbeatEvents*: seq[AsyncEvent]
|
||||
|
||||
iwantsRequested*: HashSet[MessageId]
|
||||
|
||||
MeshMetrics* = object # scratch buffers for metrics
|
||||
otherPeersPerTopicMesh*: int64
|
||||
otherPeersPerTopicFanout*: int64
|
||||
|
||||
@@ -109,6 +109,7 @@ type
|
||||
## IDONTWANT contains unvalidated message id:s which may be long and/or
|
||||
## expensive to look up, so we apply the same salting to them as during
|
||||
## unvalidated message processing
|
||||
heIsReceivings*:Deque[HashSet[MessageId]]
|
||||
iHaveBudget*: int
|
||||
pingBudget*: int
|
||||
maxMessageSize: int
|
||||
@@ -557,4 +558,5 @@ proc new*(
|
||||
)
|
||||
result.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||
result.iDontWants.addFirst(default(HashSet[SaltedId]))
|
||||
result.heIsReceivings.addFirst(default(HashSet[MessageId]))
|
||||
result.startSendNonPriorityTask()
|
||||
|
||||
@@ -63,6 +63,7 @@ type
|
||||
graft*: seq[ControlGraft]
|
||||
prune*: seq[ControlPrune]
|
||||
idontwant*: seq[ControlIWant]
|
||||
imreceiving*: seq[ControlIWant]
|
||||
|
||||
ControlIHave* = object
|
||||
topicID*: string
|
||||
@@ -173,11 +174,12 @@ proc byteSize(controlPrune: ControlPrune): int =
|
||||
# 8 bytes for uint64
|
||||
|
||||
static:
|
||||
expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant"])
|
||||
expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant", "imreceiving"])
|
||||
proc byteSize(control: ControlMessage): int =
|
||||
control.ihave.foldl(a + b.byteSize, 0) + control.iwant.foldl(a + b.byteSize, 0) +
|
||||
control.graft.foldl(a + b.byteSize, 0) + control.prune.foldl(a + b.byteSize, 0) +
|
||||
control.idontwant.foldl(a + b.byteSize, 0)
|
||||
control.idontwant.foldl(a + b.byteSize, 0) +
|
||||
control.imreceiving.foldl(a + b.byteSize, 0)
|
||||
|
||||
static:
|
||||
expectedFields(RPCMsg, @["subscriptions", "messages", "control", "ping", "pong"])
|
||||
|
||||
@@ -89,6 +89,8 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
|
||||
ipb.write(4, prune)
|
||||
for idontwant in control.idontwant:
|
||||
ipb.write(5, idontwant)
|
||||
for imreceiving in control.imreceiving:
|
||||
ipb.write(6, imreceiving)
|
||||
if len(ipb.buffer) > 0:
|
||||
ipb.finish()
|
||||
pb.write(field, ipb)
|
||||
@@ -208,6 +210,7 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inli
|
||||
var graftpbs: seq[seq[byte]]
|
||||
var prunepbs: seq[seq[byte]]
|
||||
var idontwant: seq[seq[byte]]
|
||||
var imreceiving: seq[seq[byte]]
|
||||
if ?cpb.getRepeatedField(1, ihavepbs):
|
||||
for item in ihavepbs:
|
||||
control.ihave.add(?decodeIHave(initProtoBuffer(item)))
|
||||
@@ -223,6 +226,9 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inli
|
||||
if ?cpb.getRepeatedField(5, idontwant):
|
||||
for item in idontwant:
|
||||
control.idontwant.add(?decodeIWant(initProtoBuffer(item)))
|
||||
if ? cpb.getRepeatedField(6, imreceiving):
|
||||
for item in imreceiving:
|
||||
control.imreceiving.add(?decodeIWant(initProtoBuffer(item)))
|
||||
trace "decodeControl: message statistics",
|
||||
graft_count = len(control.graft),
|
||||
prune_count = len(control.prune),
|
||||
|
||||
@@ -750,7 +750,7 @@ proc new*(
|
||||
rng: ref HmacDrbgContext = newRng(),
|
||||
minDuration = MinimumDuration,
|
||||
maxDuration = MaximumDuration,
|
||||
): T =
|
||||
): T {.raises: [RendezVousError].} =
|
||||
let rdv = T.new(rng, minDuration, maxDuration)
|
||||
rdv.setup(switch)
|
||||
return rdv
|
||||
|
||||
@@ -677,55 +677,76 @@ suite "GossipSub internal":
|
||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
||||
gossipSub.subscribe(topic, handler2)
|
||||
|
||||
# Instantiates 30 peers and connects all of them to the previously defined `gossipSub`
|
||||
for i in 0 ..< 30:
|
||||
# Define a new connection
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
peer.handler = handler
|
||||
# Add the connection to `gossipSub`, to their `gossipSub.gossipsub` and `gossipSub.mesh` tables
|
||||
gossipSub.grafted(peer, topic)
|
||||
gossipSub.mesh[topic].incl(peer)
|
||||
|
||||
# Peers with no budget should not request messages
|
||||
block:
|
||||
# should ignore no budget peer
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
let id = @[0'u8, 1, 2, 3]
|
||||
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
|
||||
peer.iHaveBudget = 0
|
||||
let iwants = gossipSub.handleIHave(peer, @[msg])
|
||||
check:
|
||||
iwants.messageIDs.len == 0
|
||||
|
||||
block:
|
||||
# given duplicate ihave should generate only one iwant
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
let id = @[0'u8, 1, 2, 3]
|
||||
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
|
||||
let iwants = gossipSub.handleIHave(peer, @[msg])
|
||||
check:
|
||||
iwants.messageIDs.len == 1
|
||||
|
||||
block:
|
||||
# given duplicate iwant should generate only one message
|
||||
# Define a new connection
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
# Add message to `gossipSub`'s message cache
|
||||
let id = @[0'u8, 1, 2, 3]
|
||||
gossipSub.mcache.put(id, Message())
|
||||
peer.sentIHaves[^1].incl(id)
|
||||
# Build an IHAVE message that contains the same message ID three times
|
||||
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
|
||||
# Given the peer has no budget to request messages
|
||||
peer.iHaveBudget = 0
|
||||
# When a peer makes an IHAVE request for the a message that `gossipSub` has
|
||||
let iwants = gossipSub.handleIHave(peer, @[msg])
|
||||
# Then `gossipSub` should not generate an IWant message for the message,
|
||||
check:
|
||||
iwants.messageIDs.len == 0
|
||||
|
||||
# Peers with budget should request messages. If ids are repeated, only one request should be generated
|
||||
block:
|
||||
# Define a new connection
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
let id = @[0'u8, 1, 2, 3]
|
||||
# Build an IHAVE message that contains the same message ID three times
|
||||
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
|
||||
# Given the budget is not 0 (because it's not been overridden)
|
||||
# When a peer makes an IHAVE request for the a message that `gossipSub` does not have
|
||||
let iwants = gossipSub.handleIHave(peer, @[msg])
|
||||
# Then `gossipSub` should generate an IWant message for the message
|
||||
check:
|
||||
iwants.messageIDs.len == 1
|
||||
|
||||
# Peers with budget should request messages. If ids are repeated, only one request should be generated
|
||||
block:
|
||||
# Define a new connection
|
||||
let conn = TestBufferStream.new(noop)
|
||||
conns &= conn
|
||||
let peerId = randomPeerId()
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
# Add message to `gossipSub`'s message cache
|
||||
let id = @[0'u8, 1, 2, 3]
|
||||
gossipSub.mcache.put(id, Message())
|
||||
peer.sentIHaves[^1].incl(id)
|
||||
# Build an IWANT message that contains the same message ID three times
|
||||
let msg = ControlIWant(messageIDs: @[id, id, id])
|
||||
# When a peer makes an IWANT request for the a message that `gossipSub` has
|
||||
let genmsg = gossipSub.handleIWant(peer, @[msg])
|
||||
# Then `gossipSub` should return the message
|
||||
check:
|
||||
genmsg.len == 1
|
||||
|
||||
|
||||
@@ -842,6 +842,8 @@ suite "Switch":
|
||||
switch1.peerStore[AddressBook][switch2.peerInfo.peerId] == switch2.peerInfo.addrs
|
||||
switch1.peerStore[ProtoBook][switch2.peerInfo.peerId] == switch2.peerInfo.protocols
|
||||
|
||||
switch1.peerStore[LastSeenBook][switch2.peerInfo.peerId].isSome()
|
||||
|
||||
switch1.peerInfo.peerId notin switch2.peerStore[AddressBook]
|
||||
switch1.peerInfo.peerId notin switch2.peerStore[ProtoBook]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user