Compare commits

...

19 Commits

Author SHA1 Message Date
ufarooqstatus
05446734e3 corrected dup_during_validation count 2025-01-04 21:18:25 +05:00
ufarooqstatus
64ef502c87 Merge branch 'master' into lma_merge_imreceiving_iwant_1
updating local branch
2025-01-04 02:13:02 +05:00
diegomrsantos
1fa30f07e8 chore(ci): add arm64 for macOS (#1212)
This PR adds the macOS 14 GitHub runner that uses the arm64 cpu.
2024-12-20 21:18:56 -04:00
richΛrd
39d0451a10 chore: validate PR titles and commits, and autoassign PRs (#1227) 2024-12-20 15:42:54 +01:00
richΛrd
4dc7a89f45 chore: use latest nimpng and nico (#1229) 2024-12-17 14:37:06 -04:00
richΛrd
fd26f93b80 fix(ci): use nim 2.0 branch (#1228) 2024-12-09 19:41:08 +00:00
Etan Kissling
dd2c74d413 feat(nameresolving): Add {.async: (raises).} annotations (#1214)
Modernize `nameresolving` modules to track `{.async: (raises).}`.

---------

Co-authored-by: kaiserd <1684595+kaiserd@users.noreply.github.com>
Co-authored-by: richΛrd <info@richardramos.me>
2024-12-09 12:43:21 +01:00
Eugene Kabanov
b7e0df127f Fix PeerStore missing remote endpoints of established connection. (#1226) 2024-11-27 14:49:41 +02:00
kaiserd
f591e692fc chore(version): update libp2p.nimble to 1.7.1 (#1223)
minor version for rendezvous fix
2024-11-09 10:51:33 +07:00
NagyZoltanPeter
8855bce085 fix:missing raises pragma for one of RendezVous.new (#1222) 2024-11-08 11:11:51 +01:00
kaiserd
ed5670408b chore(version): update libp2p.nimble to 1.7.0 (#1213)
This commit is planned to be tagged with v1.7.0.
The only feature added in this version are configurable min and max TTL
for rendezvous.

Co-authored-by: ksr <kaiserd@users.noreply.github.com>
2024-11-03 18:15:33 +00:00
Álex
97192a3c80 fix(ci): nim 2.0 dependency failure (#1216)
Broken due to an update to Nimble 0.16.2 for the version-2-0 branch.
This PR sets the ref to the previous commit. Also, updates the key
naming so it fits better.

Nim's commit list: https://github.com/nim-lang/Nim/commits/version-2-0/

# Important Note
In this PR some required job's names were changed. They need to be
updated at repo-level so this PR can be merged.
2024-10-31 17:56:13 +00:00
Álex
294d06323c docs(test): handle IHAVE / IWANT tests (#1202)
Add documentation as requested.
2024-10-31 17:23:24 +00:00
ufarooqstatus
b2a75fc25e we make only one iwant request 2024-10-01 02:45:38 +05:00
ufarooqstatus
4b691b6374 set num_finds to 1 2024-09-29 23:07:42 +05:00
ufarooqstatus
8bb6215d8a imreceiving handling merged with iwant optimization 2024-09-27 03:27:12 +05:00
ufarooqstatus
f1b78f6be6 IMReceiving message added 2024-09-27 00:17:18 +05:00
ufarooqstatus
8377eb0362 stats places, warmup messages added 2024-09-24 23:19:49 +05:00
ufarooqstatus
35d1876ad8 added stats counters, still to check message receives from mesh after issuing iwant 2024-09-22 23:31:42 +05:00
28 changed files with 576 additions and 178 deletions

View File

@@ -6,7 +6,7 @@ inputs:
cpu:
description: "CPU to build for"
default: "amd64"
nim_branch:
nim_ref:
description: "Nim version"
default: "version-1-6"
shell:
@@ -88,6 +88,8 @@ runs:
run: |
if [[ '${{ inputs.cpu }}' == 'amd64' ]]; then
PLATFORM=x64
elif [[ '${{ inputs.cpu }}' == 'arm64' ]]; then
PLATFORM=arm64
else
PLATFORM=x86
fi
@@ -117,7 +119,7 @@ runs:
uses: actions/cache@v4
with:
path: '${{ github.workspace }}/nim'
key: ${{ inputs.os }}-${{ inputs.cpu }}-nim-${{ inputs.nim_branch }}-cache-${{ env.cache_nonce }}
key: ${{ inputs.os }}-${{ inputs.cpu }}-nim-${{ inputs.nim_ref }}-cache-${{ env.cache_nonce }}
- name: Build Nim and Nimble
shell: ${{ inputs.shell }}
@@ -126,6 +128,6 @@ runs:
# We don't want partial matches of the cache restored
rm -rf nim
curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh
env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} NIM_COMMIT=${{ inputs.nim_branch }} \
env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} NIM_COMMIT=${{ inputs.nim_ref }} \
QUICK_AND_DIRTY_COMPILER=1 QUICK_AND_DIRTY_NIMBLE=1 CC=gcc \
bash build_nim.sh nim csources dist/nimble NimBinaries

12
.github/scripts/colors.sh vendored Normal file
View File

@@ -0,0 +1,12 @@
#!/usr/bin/env bash
# Colors
export YLW='\033[1;33m'
export RED='\033[0;31m'
export GRN='\033[0;32m'
export BLU='\033[0;34m'
export BLD='\033[1m'
export RST='\033[0m'
# Clear line
export CLR='\033[2K'

4
.github/scripts/commit_check.sh vendored Executable file
View File

@@ -0,0 +1,4 @@
#!/usr/bin/env bash
source .github/scripts/parse_commits.sh
parse_commits "$@"

30
.github/scripts/parse_commits.sh vendored Normal file
View File

@@ -0,0 +1,30 @@
#!/usr/bin/env bash
# The output of this script is as follows:
# 1. One line "checking commits between: <start_commit> <end_commit>"
# 2. One line for each commit message that is not well-formed
set -euo pipefail
source .github/scripts/colors.sh
parse_commits() {
BASE_BRANCH=${BASE_BRANCH:-master}
start_commit=${1:-origin/${BASE_BRANCH}}
end_commit=${2:-HEAD}
exit_code=0
echo -e "${GRN}Checking commits between:${RST} $start_commit $end_commit"
# Run the loop in the current shell using process substitution
while IFS= read -r message || [ -n "$message" ]; do
# Check if commit message follows conventional commits format
if [[ ! $message =~ ^(build|chore|ci|docs|feat|fix|perf|refactor|revert|style|test)(\(.*\))?:.*$ ]]; then
echo -e "${YLW}Commit message is ill-formed:${RST} $message"
exit_code=1
fi
done < <(git log --format=%s "$start_commit".."$end_commit")
exit ${exit_code}
}

12
.github/workflows/auto_assign_pr.yml vendored Normal file
View File

@@ -0,0 +1,12 @@
name: Auto Assign PR to Creator
on:
pull_request:
types:
- opened
jobs:
assign_creator:
runs-on: ubuntu-latest
steps:
- uses: toshimaru/auto-author-assign@v1.6.2

View File

@@ -27,12 +27,14 @@ jobs:
cpu: amd64
- os: macos
cpu: amd64
- os: macos-14
cpu: arm64
- os: windows
cpu: amd64
nim:
- branch: version-1-6
nim:
- ref: version-1-6
memory_management: refc
- branch: version-2-0
- ref: version-2-0
memory_management: refc
include:
- platform:
@@ -47,6 +49,10 @@ jobs:
os: macos
builder: macos-13
shell: bash
- platform:
os: macos-14
builder: macos-14
shell: bash
- platform:
os: windows
builder: windows-2022
@@ -56,7 +62,7 @@ jobs:
run:
shell: ${{ matrix.shell }}
name: '${{ matrix.platform.os }}-${{ matrix.platform.cpu }} (Nim ${{ matrix.nim.branch }})'
name: '${{ matrix.platform.os }}-${{ matrix.platform.cpu }} (Nim ${{ matrix.nim.ref }})'
runs-on: ${{ matrix.builder }}
steps:
- name: Checkout
@@ -70,12 +76,12 @@ jobs:
os: ${{ matrix.platform.os }}
cpu: ${{ matrix.platform.cpu }}
shell: ${{ matrix.shell }}
nim_branch: ${{ matrix.nim.branch }}
nim_ref: ${{ matrix.nim.ref }}
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: '~1.15.5'
go-version: '~1.16.0' # That's the minimum Go version that works with arm.
- name: Install p2pd
run: |
@@ -86,9 +92,9 @@ jobs:
uses: actions/cache@v3
with:
path: nimbledeps
# Using nim.branch as a simple way to differentiate between nimble using the "pkgs" or "pkgs2" directories.
# The change happened on Nimble v0.14.0.
key: nimbledeps-${{ matrix.nim.branch }}-${{ hashFiles('.pinned') }} # hashFiles returns a different value on windows
# Using nim.ref as a simple way to differentiate between nimble using the "pkgs" or "pkgs2" directories.
# The change happened on Nimble v0.14.0. Also forcing the deps to be reinstalled on each os and cpu.
key: nimbledeps-${{ matrix.nim.ref }}-${{ matrix.builder }}-${{ matrix.platform.cpu }}-${{ hashFiles('.pinned') }} # hashFiles returns a different value on windows
- name: Install deps
if: ${{ steps.deps-cache.outputs.cache-hit != 'true' }}
@@ -109,5 +115,6 @@ jobs:
nim --version
nimble --version
gcc --version
NIMFLAGS="${NIMFLAGS} --mm:${{ matrix.nim.memory_management }}"
nimble test

77
.github/workflows/commit_lint.yml vendored Normal file
View File

@@ -0,0 +1,77 @@
name: "Conventional Commits"
on:
pull_request:
types:
- opened
- edited
- reopened
- synchronize
jobs:
main:
name: Validate commit messages
runs-on: ubuntu-latest
permissions:
pull-requests: write
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
ref: ${{github.event.pull_request.head.ref}}
repository: ${{github.event.pull_request.head.repo.full_name}}
- name: Check commit message
id: check_commit_message
if: always()
run: |
set +e
base_sha=${{ github.event.pull_request.base.sha }}
head_sha=${{ github.event.pull_request.head.sha }}
output=$(.github/scripts/commit_check.sh "${base_sha}" "${head_sha}" 2>&1)
exit_code=$?
echo "${output}" | sed '$d'
echo "exit_code=${exit_code}" >> $GITHUB_OUTPUT
invalid_commit_messages=$(echo "${output}" | sed '1d;$d')
invalid_commit_messages=$(echo "${output}" | sed '1d;$d')
invalid_commit_messages=$(echo "${invalid_commit_messages}" | sed 's/\x1b\[[0-9;]*m//g') # Remove color codes
invalid_commit_messages=$(echo "${invalid_commit_messages}" | sed 's/^Commit message is ill-formed: //') # Remove prefix
if [[ $exit_code -ne 0 ]]; then
EOF=$(dd if=/dev/urandom bs=15 count=1 status=none | base64)
echo "error_message<<$EOF" >> "$GITHUB_ENV"
echo "${invalid_commit_messages}" >> "$GITHUB_ENV"
echo "$EOF" >> "$GITHUB_ENV"
fi
- name: "Publish failed commit messages"
uses: marocchino/sticky-pull-request-comment@v2
# When the previous steps fails, the workflow would stop. By adding this
# condition you can continue the execution with the populated error message.
if: always() && (steps.check_commit_message.outputs.exit_code != 0)
with:
header: commit-message-lint-error
message: |
Commits must follow the [Conventional Commits specification](https://www.conventionalcommits.org/en/v1.0.0/)
Please fix these commit messages:
```
${{ env.error_message }}
```
# Delete a previous comment when the issue has been resolved
- name: "Delete previous comment"
if: ${{ steps.check_commit_message.outputs.exit_code == 0 }}
uses: marocchino/sticky-pull-request-comment@v2
with:
header: commit-message-lint-error
delete: true
- name: "Mark as failed"
if: steps.check_commit_message.outputs.exit_code != 0
uses: actions/github-script@v7
with:
script: |
core.setFailed("Some commit messages are ill-formed")

View File

@@ -10,5 +10,5 @@ jobs:
name: Daily amd64
uses: ./.github/workflows/daily_common.yml
with:
nim: "[{'branch': 'version-1-6', 'memory_management': 'refc'}, {'branch': 'version-2-0', 'memory_management': 'refc'}]"
nim: "[{'ref': 'version-1-6', 'memory_management': 'refc'}, {'ref': 'version-2-0', 'memory_management': 'refc'}]"
cpu: "['amd64']"

View File

@@ -7,7 +7,7 @@ on:
nim:
description: 'Nim Configuration'
required: true
type: string # Following this format: [{"branch": ..., "memory_management": ...}, ...]
type: string # Following this format: [{"ref": ..., "memory_management": ...}, ...]
cpu:
description: 'CPU'
required: true
@@ -58,7 +58,7 @@ jobs:
run:
shell: ${{ matrix.platform.shell }}
name: '${{ matrix.platform.os }}-${{ matrix.cpu }} (Nim ${{ matrix.nim.branch }})'
name: '${{ matrix.platform.os }}-${{ matrix.cpu }} (Nim ${{ matrix.nim.ref }})'
runs-on: ${{ matrix.platform.builder }}
steps:
- name: Checkout
@@ -69,13 +69,13 @@ jobs:
with:
os: ${{ matrix.platform.os }}
shell: ${{ matrix.platform.shell }}
nim_branch: ${{ matrix.nim.branch }}
nim_ref: ${{ matrix.nim.ref }}
cpu: ${{ matrix.cpu }}
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: '~1.15.5'
go-version: '~1.16.0'
cache: false
- name: Install p2pd

View File

@@ -10,5 +10,5 @@ jobs:
name: Daily Nim Devel
uses: ./.github/workflows/daily_common.yml
with:
nim: "[{'branch': 'devel', 'memory_management': 'orc'}]"
nim: "[{'ref': 'devel', 'memory_management': 'orc'}]"
cpu: "['amd64']"

View File

@@ -10,6 +10,6 @@ jobs:
name: Daily i386 (Linux)
uses: ./.github/workflows/daily_common.yml
with:
nim: "[{'branch': 'version-1-6', 'memory_management': 'refc'}, {'branch': 'version-2-0', 'memory_management': 'refc'}, {'branch': 'devel', 'memory_management': 'orc'}]"
nim: "[{'ref': 'version-1-6', 'memory_management': 'refc'}, {'ref': 'version-2-0', 'memory_management': 'refc'}, {'ref': 'devel', 'memory_management': 'orc'}]"
cpu: "['i386']"
exclude: "[{'platform': {'os':'macos'}}, {'platform': {'os':'windows'}}]"

View File

@@ -10,6 +10,6 @@ jobs:
name: Daily SAT
uses: ./.github/workflows/daily_common.yml
with:
nim: "[{'branch': 'version-2-0', 'memory_management': 'refc'}]"
nim: "[{'ref': 'version-2-0', 'memory_management': 'refc'}]"
cpu: "['amd64']"
use_sat_solver: true

35
.github/workflows/pr_lint.yml vendored Normal file
View File

@@ -0,0 +1,35 @@
name: "Conventional Commits"
on:
pull_request:
types:
- opened
- edited
- reopened
- synchronize
jobs:
main:
name: Validate PR title
runs-on: ubuntu-latest
permissions:
pull-requests: write
steps:
- uses: amannn/action-semantic-pull-request@v5
id: lint_pr_title
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- uses: marocchino/sticky-pull-request-comment@v2
# When the previous steps fails, the workflow would stop. By adding this
# condition you can continue the execution with the populated error message.
if: always() && (steps.lint_pr_title.outputs.error_message != null)
with:
header: pr-title-lint-error
message: |
Pull requests titles must follow the [Conventional Commits specification](https://www.conventionalcommits.org/en/v1.0.0/)
# Delete a previous comment when the issue has been resolved
- if: ${{ steps.lint_pr_title.outputs.error_message == null }}
uses: marocchino/sticky-pull-request-comment@v2
with:
header: pr-title-lint-error
delete: true

View File

@@ -13,7 +13,7 @@ For more information about the go daemon, check out [this repository](https://gi
> **Required only** for running the tests.
# Prerequisites
Go with version `1.15.15`.
Go with version `1.16.0`.
> You will *likely* be able to build `go-libp2p-daemon` with different Go versions, but **they haven't been tested**.
# Installation
@@ -21,7 +21,7 @@ Follow one of the methods below:
## Script
Run the build script while having the `go` command pointing to the correct Go version.
We recommend using `1.15.15`, as previously stated.
We recommend using `1.16.0`, as previously stated.
```sh
./scripts/build_p2pd.sh
```

View File

@@ -1,7 +1,7 @@
mode = ScriptMode.Verbose
packageName = "libp2p"
version = "1.6.0"
version = "1.7.1"
author = "Status Research & Development GmbH"
description = "LibP2P implementation"
license = "MIT"
@@ -125,9 +125,8 @@ task examples_build, "Build the samples":
buildSample("tutorial_3_protobuf", true)
buildSample("tutorial_4_gossipsub", true)
buildSample("tutorial_5_discovery", true)
exec "nimble install -y nimpng@#HEAD"
# this is to fix broken build on 1.7.3, remove it when nimpng version 0.3.2 or later is released
exec "nimble install -y nico@#af99dd60bf2b395038ece815ea1012330a80d6e6"
exec "nimble install -y nimpng"
exec "nimble install -y nico --passNim=--skipParentCfg"
buildSample("tutorial_6_game", false, "--styleCheck:off")
# pin system

View File

@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -10,7 +10,7 @@
{.push raises: [].}
import
std/[streams, strutils, sets, sequtils],
std/[streams, sets, sequtils],
chronos,
chronicles,
stew/byteutils,
@@ -39,24 +39,32 @@ proc questionToBuf(address: string, kind: QKind): seq[byte] =
var buf = newSeq[byte](dataLen)
discard requestStream.readData(addr buf[0], dataLen)
return buf
except CatchableError as exc:
buf
except IOError as exc:
info "Failed to created DNS buffer", description = exc.msg
return newSeq[byte](0)
newSeq[byte](0)
except OSError as exc:
info "Failed to created DNS buffer", description = exc.msg
newSeq[byte](0)
except ValueError as exc:
info "Failed to created DNS buffer", description = exc.msg
newSeq[byte](0)
proc getDnsResponse(
dnsServer: TransportAddress, address: string, kind: QKind
): Future[Response] {.async.} =
): Future[Response] {.
async: (raises: [CancelledError, IOError, OSError, TransportError, ValueError])
.} =
var sendBuf = questionToBuf(address, kind)
if sendBuf.len == 0:
raise newException(ValueError, "Incorrect DNS query")
let receivedDataFuture = newFuture[void]()
let receivedDataFuture = Future[void].Raising([CancelledError]).init()
proc datagramDataReceived(
transp: DatagramTransport, raddr: TransportAddress
): Future[void] {.async, closure.} =
): Future[void] {.async: (raises: []), closure.} =
receivedDataFuture.complete()
let sock =
@@ -68,27 +76,41 @@ proc getDnsResponse(
try:
await sock.sendTo(dnsServer, addr sendBuf[0], sendBuf.len)
await receivedDataFuture or sleepAsync(5.seconds) #unix default
if not receivedDataFuture.finished:
try:
await receivedDataFuture.wait(5.seconds) #unix default
except AsyncTimeoutError:
raise newException(IOError, "DNS server timeout")
let rawResponse = sock.getMessage()
# parseResponse can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
return exceptionToAssert:
try:
parseResponse(string.fromBytes(rawResponse))
except IOError as exc:
raise exc
except OSError as exc:
raise exc
except ValueError as exc:
raise exc
except Exception as exc:
# Nim 1.6: parseResponse can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
raiseAssert exc.msg
finally:
await sock.closeWait()
method resolveIp*(
self: DnsResolver, address: string, port: Port, domain: Domain = Domain.AF_UNSPEC
): Future[seq[TransportAddress]] {.async.} =
): Future[seq[TransportAddress]] {.
async: (raises: [CancelledError, TransportAddressError])
.} =
trace "Resolving IP using DNS", address, servers = self.nameServers.mapIt($it), domain
for _ in 0 ..< self.nameServers.len:
let server = self.nameServers[0]
var responseFutures: seq[Future[Response]]
var responseFutures: seq[
Future[Response].Raising(
[CancelledError, IOError, OSError, TransportError, ValueError]
)
]
if domain == Domain.AF_INET or domain == Domain.AF_UNSPEC:
responseFutures.add(getDnsResponse(server, address, A))
@@ -103,23 +125,32 @@ method resolveIp*(
var
resolvedAddresses: OrderedSet[string]
resolveFailed = false
template handleFail(e): untyped =
info "Failed to query DNS", address, error = e.msg
resolveFailed = true
break
for fut in responseFutures:
try:
let resp = await fut
for answer in resp.answers:
# toString can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
resolvedAddresses.incl(exceptionToAssert(answer.toString()))
resolvedAddresses.incl(answer.toString())
except CancelledError as e:
raise e
except ValueError as e:
info "Invalid DNS query", address, error = e.msg
return @[]
except CatchableError as e:
info "Failed to query DNS", address, error = e.msg
resolveFailed = true
break
except IOError as e:
handleFail(e)
except OSError as e:
handleFail(e)
except TransportError as e:
handleFail(e)
except Exception as e:
# Nim 1.6: answer.toString can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
raiseAssert e.msg
if resolveFailed:
self.nameServers.add(self.nameServers[0])
@@ -132,27 +163,39 @@ method resolveIp*(
debug "Failed to resolve address, returning empty set"
return @[]
method resolveTxt*(self: DnsResolver, address: string): Future[seq[string]] {.async.} =
method resolveTxt*(
self: DnsResolver, address: string
): Future[seq[string]] {.async: (raises: [CancelledError]).} =
trace "Resolving TXT using DNS", address, servers = self.nameServers.mapIt($it)
for _ in 0 ..< self.nameServers.len:
let server = self.nameServers[0]
try:
# toString can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
let response = await getDnsResponse(server, address, TXT)
return exceptionToAssert:
trace "Got TXT response",
server = $server, answer = response.answers.mapIt(it.toString())
response.answers.mapIt(it.toString())
except CancelledError as e:
raise e
except CatchableError as e:
template handleFail(e): untyped =
info "Failed to query DNS", address, error = e.msg
self.nameServers.add(self.nameServers[0])
self.nameServers.delete(0)
continue
try:
let response = await getDnsResponse(server, address, TXT)
trace "Got TXT response",
server = $server, answer = response.answers.mapIt(it.toString())
return response.answers.mapIt(it.toString())
except CancelledError as e:
raise e
except IOError as e:
handleFail(e)
except OSError as e:
handleFail(e)
except TransportError as e:
handleFail(e)
except ValueError as e:
handleFail(e)
except Exception as e:
# Nim 1.6: toString can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
raiseAssert e.msg
debug "Failed to resolve TXT, returning empty set"
return @[]

View File

@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -25,17 +25,25 @@ type MockResolver* = ref object of NameResolver
method resolveIp*(
self: MockResolver, address: string, port: Port, domain: Domain = Domain.AF_UNSPEC
): Future[seq[TransportAddress]] {.async.} =
): Future[seq[TransportAddress]] {.
async: (raises: [CancelledError, TransportAddressError])
.} =
var res: seq[TransportAddress]
if domain == Domain.AF_INET or domain == Domain.AF_UNSPEC:
for resp in self.ipResponses.getOrDefault((address, false)):
result.add(initTAddress(resp, port))
res.add(initTAddress(resp, port))
if domain == Domain.AF_INET6 or domain == Domain.AF_UNSPEC:
for resp in self.ipResponses.getOrDefault((address, true)):
result.add(initTAddress(resp, port))
res.add(initTAddress(resp, port))
method resolveTxt*(self: MockResolver, address: string): Future[seq[string]] {.async.} =
return self.txtResponses.getOrDefault(address)
res
method resolveTxt*(
self: MockResolver, address: string
): Future[seq[string]] {.async: (raises: [CancelledError]).} =
self.txtResponses.getOrDefault(address)
proc new*(T: typedesc[MockResolver]): T =
T()

View File

@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -9,7 +9,7 @@
{.push raises: [].}
import std/[sugar, sets, sequtils, strutils]
import std/[sets, sequtils, strutils]
import chronos, chronicles, stew/endians2
import ".."/[multiaddress, multicodec]
@@ -20,19 +20,17 @@ type NameResolver* = ref object of RootObj
method resolveTxt*(
self: NameResolver, address: string
): Future[seq[string]] {.async, base.} =
): Future[seq[string]] {.async: (raises: [CancelledError]), base.} =
## Get TXT record
##
doAssert(false, "Not implemented!")
raiseAssert "Not implemented!"
method resolveIp*(
self: NameResolver, address: string, port: Port, domain: Domain = Domain.AF_UNSPEC
): Future[seq[TransportAddress]] {.async, base.} =
): Future[seq[TransportAddress]] {.
async: (raises: [CancelledError, TransportAddressError]), base
.} =
## Resolve the specified address
##
doAssert(false, "Not implemented!")
raiseAssert "Not implemented!"
proc getHostname*(ma: MultiAddress): string =
let
@@ -46,30 +44,40 @@ proc getHostname*(ma: MultiAddress): string =
proc resolveOneAddress(
self: NameResolver, ma: MultiAddress, domain: Domain = Domain.AF_UNSPEC, prefix = ""
): Future[seq[MultiAddress]] {.async.} =
#Resolve a single address
): Future[seq[MultiAddress]] {.
async: (raises: [CancelledError, MaError, TransportAddressError])
.} =
# Resolve a single address
let portPart = ma[1].valueOr:
raise maErr error
var pbuf: array[2, byte]
var dnsval = getHostname(ma)
if ma[1].tryGet().protoArgument(pbuf).tryGet() == 0:
raise newException(MaError, "Incorrect port number")
let plen = portPart.protoArgument(pbuf).valueOr:
raise maErr error
if plen == 0:
raise maErr "Incorrect port number"
let
port = Port(fromBytesBE(uint16, pbuf))
dnsval = getHostname(ma)
resolvedAddresses = await self.resolveIp(prefix & dnsval, port, domain)
return collect(newSeqOfCap(4)):
for address in resolvedAddresses:
var createdAddress = MultiAddress.init(address).tryGet()[0].tryGet()
for part in ma:
if DNS.match(part.tryGet()):
continue
createdAddress &= part.tryGet()
createdAddress
resolvedAddresses.mapIt:
let address = MultiAddress.init(it).valueOr:
raise maErr error
var createdAddress = address[0].valueOr:
raise maErr error
for part in ma:
let part = part.valueOr:
raise maErr error
if DNS.match(part):
continue
createdAddress &= part
createdAddress
proc resolveDnsAddr*(
self: NameResolver, ma: MultiAddress, depth: int = 0
): Future[seq[MultiAddress]] {.async.} =
): Future[seq[MultiAddress]] {.
async: (raises: [CancelledError, MaError, TransportAddressError])
.} =
if not DNSADDR.matchPartial(ma):
return @[ma]
@@ -78,54 +86,67 @@ proc resolveDnsAddr*(
info "Stopping DNSADDR recursion, probably malicious", ma
return @[]
var dnsval = getHostname(ma)
let txt = await self.resolveTxt("_dnsaddr." & dnsval)
let
dnsval = getHostname(ma)
txt = await self.resolveTxt("_dnsaddr." & dnsval)
trace "txt entries", txt
var result: seq[MultiAddress]
const codec = multiCodec("p2p")
let maCodec = block:
let hasCodec = ma.contains(codec).valueOr:
raise maErr error
if hasCodec:
ma[codec]
else:
(static(default(MaResult[MultiAddress])))
var res: seq[MultiAddress]
for entry in txt:
if not entry.startsWith("dnsaddr="):
continue
let entryValue = MultiAddress.init(entry[8 ..^ 1]).tryGet()
if entryValue.contains(multiCodec("p2p")).tryGet() and
ma.contains(multiCodec("p2p")).tryGet():
if entryValue[multiCodec("p2p")] != ma[multiCodec("p2p")]:
continue
let
entryValue = MultiAddress.init(entry[8 ..^ 1]).valueOr:
raise maErr error
entryHasCodec = entryValue.contains(multiCodec("p2p")).valueOr:
raise maErr error
if entryHasCodec and maCodec.isOk and entryValue[codec] != maCodec:
continue
let resolved = await self.resolveDnsAddr(entryValue, depth + 1)
for r in resolved:
result.add(r)
res.add(r)
if result.len == 0:
if res.len == 0:
debug "Failed to resolve a DNSADDR", ma
return @[]
return result
res
proc resolveMAddress*(
self: NameResolver, address: MultiAddress
): Future[seq[MultiAddress]] {.async.} =
): Future[seq[MultiAddress]] {.
async: (raises: [CancelledError, MaError, TransportAddressError])
.} =
var res = initOrderedSet[MultiAddress]()
if not DNS.matchPartial(address):
res.incl(address)
else:
let code = address[0].tryGet().protoCode().tryGet()
let seq =
case code
of multiCodec("dns"):
await self.resolveOneAddress(address)
of multiCodec("dns4"):
await self.resolveOneAddress(address, Domain.AF_INET)
of multiCodec("dns6"):
await self.resolveOneAddress(address, Domain.AF_INET6)
of multiCodec("dnsaddr"):
await self.resolveDnsAddr(address)
else:
assert false
@[address]
for ad in seq:
let
firstPart = address[0].valueOr:
raise maErr error
code = firstPart.protoCode().valueOr:
raise maErr error
ads =
case code
of multiCodec("dns"):
await self.resolveOneAddress(address)
of multiCodec("dns4"):
await self.resolveOneAddress(address, Domain.AF_INET)
of multiCodec("dns6"):
await self.resolveOneAddress(address, Domain.AF_INET6)
of multiCodec("dnsaddr"):
await self.resolveDnsAddr(address)
else:
raise maErr("Unsupported codec " & $code)
for ad in ads:
res.incl(ad)
return res.toSeq
res.toSeq

View File

@@ -63,6 +63,7 @@ type
KeyBook* {.public.} = ref object of PeerBook[PublicKey]
AgentBook* {.public.} = ref object of PeerBook[string]
LastSeenBook* {.public.} = ref object of PeerBook[Opt[MultiAddress]]
ProtoVersionBook* {.public.} = ref object of PeerBook[string]
SPRBook* {.public.} = ref object of PeerBook[Envelope]
@@ -145,10 +146,16 @@ proc del*(peerStore: PeerStore, peerId: PeerId) {.public.} =
for _, book in peerStore.books:
book.deletor(peerId)
proc updatePeerInfo*(peerStore: PeerStore, info: IdentifyInfo) =
if info.addrs.len > 0:
proc updatePeerInfo*(
peerStore: PeerStore,
info: IdentifyInfo,
observedAddr: Opt[MultiAddress] = Opt.none(MultiAddress),
) =
if len(info.addrs) > 0:
peerStore[AddressBook][info.peerId] = info.addrs
peerStore[LastSeenBook][info.peerId] = observedAddr
info.pubkey.withValue(pubkey):
peerStore[KeyBook][info.peerId] = pubkey
@@ -200,7 +207,7 @@ proc identify*(peerStore: PeerStore, muxer: Muxer) {.async.} =
knownAgent = shortAgent
muxer.connection.setShortAgent(knownAgent)
peerStore.updatePeerInfo(info)
peerStore.updatePeerInfo(info, stream.observedAddr)
finally:
await stream.closeWithEOF()

View File

@@ -36,6 +36,32 @@ import ./gossipsub/[types, scoring, behavior], ../../utils/heartbeat
export types, scoring, behavior, pubsub
import std/atomics
const WARMUP_THRESHOLD = 2
var
lma_dup_during_validation: Atomic[uint32] # number of duplicates during 1st message validation
lma_idontwant_saves: Atomic[uint32] # number of Txs saved due to idontwant
lma_duplicate_count: Atomic[uint32] # number of duplicate messages received
lma_iwants_sent: Atomic[uint32] # number of iwant requests sent
lma_iwants_replied: Atomic[uint32] # number of iwant messages that are replied
lma_imreceiving_saves: Atomic[uint32] # number of messages saved due to imreceiving message
lma_unique_receives: Atomic[uint32] # number of unique messages received
lma_mesh_recvs_aftar_iwant: Atomic[uint32] # messages received from mesh, after sending iwant request
lma_warmup_messages: Atomic[uint32] # dont issue idontwant during if < WARMUP_THRESHOLD
lma_dup_during_validation.store(0)
lma_idontwant_saves.store(0)
lma_duplicate_count.store(0)
lma_iwants_sent.store(0)
lma_iwants_replied.store(0)
lma_imreceiving_saves.store(0)
lma_unique_receives.store(0)
lma_mesh_recvs_aftar_iwant.store(0)
lma_warmup_messages.store(0)
export lma_dup_during_validation, lma_idontwant_saves, lma_duplicate_count, lma_iwants_sent,
lma_iwants_replied, lma_imreceiving_saves, lma_unique_receives, lma_mesh_recvs_aftar_iwant
logScope:
topics = "libp2p gossipsub"
@@ -226,6 +252,7 @@ method init*(g: GossipSub) =
g.codecs &= GossipSubCodec_12
g.codecs &= GossipSubCodec_11
g.codecs &= GossipSubCodec_10
g.iwantsRequested = initHashSet[MessageId]()
method onNewPeer*(g: GossipSub, peer: PubSubPeer) =
g.withPeerStats(peer.peerId) do(stats: var PeerStats):
@@ -347,6 +374,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
var respControl: ControlMessage
g.handleIDontWant(peer, control.idontwant)
g.handleIMReceiving(peer, control.imreceiving)
let iwant = g.handleIHave(peer, control.ihave)
if iwant.messageIDs.len > 0:
respControl.iwant.add(iwant)
@@ -360,6 +388,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
if isPruneNotEmpty or isIWantNotEmpty:
if isIWantNotEmpty:
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
lma_iwants_sent.atomicInc(respControl.iwant.len.uint32)
if isPruneNotEmpty:
for prune in respControl.prune:
@@ -381,6 +410,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
# iwant replies have lower priority
trace "sending iwant reply messages", peer
lma_iwants_replied.atomicInc(messages.len.uint32)
g.send(peer, RPCMsg(messages: messages), isHighPriority = false)
proc validateAndRelay(
@@ -397,6 +427,8 @@ proc validateAndRelay(
toSendPeers.incl(peers[])
g.subscribedDirectPeers.withValue(topic, peers):
toSendPeers.incl(peers[])
if not (peer in toSendPeers):
lma_mesh_recvs_aftar_iwant.atomicInc()
toSendPeers.excl(peer)
if msg.data.len > max(512, msgId.len * 10):
@@ -409,25 +441,39 @@ proc validateAndRelay(
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).
var peersToSendIDontWant = HashSet[PubSubPeer]()
addToSendPeers(peersToSendIDontWant)
peersToSendIDontWant.exclIfIt(
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
)
g.broadcast(
peersToSendIDontWant,
RPCMsg(
control:
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)
if lma_warmup_messages.load() < WARMUP_THRESHOLD:
lma_warmup_messages.atomicInc()
if lma_warmup_messages.load() == WARMUP_THRESHOLD:
lma_dup_during_validation.store(0)
lma_idontwant_saves.store(0)
lma_duplicate_count.store(0)
lma_iwants_sent.store(0)
lma_iwants_replied.store(0)
lma_imreceiving_saves.store(0)
lma_unique_receives.store(0)
lma_mesh_recvs_aftar_iwant.store(0)
else:
var peersToSendIDontWant = HashSet[PubSubPeer]()
addToSendPeers(peersToSendIDontWant)
peersToSendIDontWant.exclIfIt(
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
)
g.broadcast(
peersToSendIDontWant,
RPCMsg(
control:
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)
let validation = await g.validate(msg)
var seenPeers: HashSet[PubSubPeer]
discard g.validationSeen.pop(saltedId, seenPeers)
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
lma_dup_during_validation.atomicInc(seenPeers.len.uint32)
libp2p_gossipsub_saved_bytes.inc(
(msg.data.len * seenPeers.len).int64, labelValues = ["validation_duplicate"]
)
@@ -463,6 +509,17 @@ proc validateAndRelay(
# Don't send it to peers that sent it during validation
toSendPeers.excl(seenPeers)
#We have received IMReceiving from these peers, We should not exclude them
#Ideally we should wait (TxTime + large safety cushion) before sending to these peers
var receivingPeers: HashSet[PubSubPeer]
for pr in toSendPeers:
for heIsReceiving in pr.heIsReceivings:
if msgId in heIsReceiving:
receivingPeers.incl(pr)
break
toSendPeers.excl(receivingPeers)
lma_imreceiving_saves.atomicInc(receivingPeers.len.uint32)
proc isMsgInIdontWant(it: PubSubPeer): bool =
for iDontWant in it.iDontWants:
if saltedId in iDontWant:
@@ -470,6 +527,7 @@ proc validateAndRelay(
libp2p_gossipsub_saved_bytes.inc(
msg.data.len.int64, labelValues = ["idontwant"]
)
lma_idontwant_saves.atomicInc()
return true
return false
@@ -596,11 +654,13 @@ method rpcHandler*(g: GossipSub, peer: PubSubPeer, data: seq[byte]) {.async.} =
g.rewardDelivered(peer, topic, false, delay)
libp2p_gossipsub_duplicate.inc()
lma_duplicate_count.atomicInc()
# onto the next message
continue
libp2p_gossipsub_received.inc()
lma_unique_receives.atomicInc()
# avoid processing messages we are not interested in
if topic notin g.topics:

View File

@@ -290,14 +290,31 @@ proc handleIHave*(
for ihave in ihaves:
trace "peer sent ihave", peer, topicID = ihave.topicID, msgs = ihave.messageIDs
if ihave.topicID in g.topics:
#look here for receieved idontwants for the same message
var meshPeers: HashSet[PubSubPeer]
g.mesh.withValue(ihave.topicID, peers): meshPeers.incl(peers[])
g.subscribedDirectPeers.withValue(ihave.topicID, peers): meshPeers.incl(peers[])
for msgId in ihave.messageIDs:
if not g.hasSeen(g.salt(msgId)):
if peer.iHaveBudget <= 0:
break
elif msgId in g.iwantsRequested:
break
elif msgId notin res.messageIDs:
res.messageIDs.add(msgId)
dec peer.iHaveBudget
trace "requested message via ihave", messageID = msgId
#dont send IWANT if we have received (N number of) IDontWant(s) for a msgID
let saltedID = g.salt(msgId)
var numFinds: int = 0
for meshPeer in meshPeers:
for heDontWant in meshPeer.iDontWants:
if saltedID in heDontWant:
numFinds = numFinds + 1
#break;
if numFinds == 0: #We currently wait for 1 IDontWants
res.messageIDs.add(msgId)
dec peer.iHaveBudget
g.iwantsRequested.incl(msgId)
trace "requested message via ihave", messageID = msgId
# shuffling res.messageIDs before sending it out to increase the likelihood
# of getting an answer if the peer truncates the list due to internal size restrictions.
g.rng.shuffle(res.messageIDs)
@@ -309,6 +326,35 @@ proc handleIDontWant*(g: GossipSub, peer: PubSubPeer, iDontWants: seq[ControlIWa
if peer.iDontWants[^1].len > 1000:
break
peer.iDontWants[^1].incl(g.salt(messageId))
#Experimental change for quick performance evaluation only (Ideally for very large messages):
#[
1) IDontWant is followed by the message. IMReceiving informs peers that we are receiving this message
2) Prototype implementation for a single topic ("test"). Need topic ID in IDontWant
3) Better solution is to send Message detail in a message preamble, That can be used for IMReceiving
]#
var toSendPeers = HashSet[PubSubPeer]()
g.floodsub.withValue("test", peers): toSendPeers.incl(peers[])
g.mesh.withValue("test", peers): toSendPeers.incl(peers[])
# add direct peers
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault("test"))
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
imreceiving: @[ControlIWant(messageIDs: @[messageId])]
))), isHighPriority = true)
proc handleIMReceiving*(g: GossipSub,
peer: PubSubPeer,
imreceivings: seq[ControlIWant]) =
for imreceiving in imreceivings:
for messageId in imreceiving.messageIDs:
if peer.heIsReceivings[^1].len > 1000: break
if messageId.len > 100: continue
peer.heIsReceivings[^1].incl(messageId)
proc handleIWant*(
g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]

View File

@@ -188,6 +188,8 @@ type
heartbeatEvents*: seq[AsyncEvent]
iwantsRequested*: HashSet[MessageId]
MeshMetrics* = object # scratch buffers for metrics
otherPeersPerTopicMesh*: int64
otherPeersPerTopicFanout*: int64

View File

@@ -109,6 +109,7 @@ type
## IDONTWANT contains unvalidated message id:s which may be long and/or
## expensive to look up, so we apply the same salting to them as during
## unvalidated message processing
heIsReceivings*:Deque[HashSet[MessageId]]
iHaveBudget*: int
pingBudget*: int
maxMessageSize: int
@@ -557,4 +558,5 @@ proc new*(
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.iDontWants.addFirst(default(HashSet[SaltedId]))
result.heIsReceivings.addFirst(default(HashSet[MessageId]))
result.startSendNonPriorityTask()

View File

@@ -63,6 +63,7 @@ type
graft*: seq[ControlGraft]
prune*: seq[ControlPrune]
idontwant*: seq[ControlIWant]
imreceiving*: seq[ControlIWant]
ControlIHave* = object
topicID*: string
@@ -173,11 +174,12 @@ proc byteSize(controlPrune: ControlPrune): int =
# 8 bytes for uint64
static:
expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant"])
expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant", "imreceiving"])
proc byteSize(control: ControlMessage): int =
control.ihave.foldl(a + b.byteSize, 0) + control.iwant.foldl(a + b.byteSize, 0) +
control.graft.foldl(a + b.byteSize, 0) + control.prune.foldl(a + b.byteSize, 0) +
control.idontwant.foldl(a + b.byteSize, 0)
control.idontwant.foldl(a + b.byteSize, 0) +
control.imreceiving.foldl(a + b.byteSize, 0)
static:
expectedFields(RPCMsg, @["subscriptions", "messages", "control", "ping", "pong"])

View File

@@ -89,6 +89,8 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
ipb.write(4, prune)
for idontwant in control.idontwant:
ipb.write(5, idontwant)
for imreceiving in control.imreceiving:
ipb.write(6, imreceiving)
if len(ipb.buffer) > 0:
ipb.finish()
pb.write(field, ipb)
@@ -208,6 +210,7 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inli
var graftpbs: seq[seq[byte]]
var prunepbs: seq[seq[byte]]
var idontwant: seq[seq[byte]]
var imreceiving: seq[seq[byte]]
if ?cpb.getRepeatedField(1, ihavepbs):
for item in ihavepbs:
control.ihave.add(?decodeIHave(initProtoBuffer(item)))
@@ -223,6 +226,9 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inli
if ?cpb.getRepeatedField(5, idontwant):
for item in idontwant:
control.idontwant.add(?decodeIWant(initProtoBuffer(item)))
if ? cpb.getRepeatedField(6, imreceiving):
for item in imreceiving:
control.imreceiving.add(?decodeIWant(initProtoBuffer(item)))
trace "decodeControl: message statistics",
graft_count = len(control.graft),
prune_count = len(control.prune),

View File

@@ -750,7 +750,7 @@ proc new*(
rng: ref HmacDrbgContext = newRng(),
minDuration = MinimumDuration,
maxDuration = MaximumDuration,
): T =
): T {.raises: [RendezVousError].} =
let rdv = T.new(rng, minDuration, maxDuration)
rdv.setup(switch)
return rdv

View File

@@ -677,55 +677,76 @@ suite "GossipSub internal":
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.subscribe(topic, handler2)
# Instantiates 30 peers and connects all of them to the previously defined `gossipSub`
for i in 0 ..< 30:
# Define a new connection
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler
# Add the connection to `gossipSub`, to their `gossipSub.gossipsub` and `gossipSub.mesh` tables
gossipSub.grafted(peer, topic)
gossipSub.mesh[topic].incl(peer)
# Peers with no budget should not request messages
block:
# should ignore no budget peer
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
let id = @[0'u8, 1, 2, 3]
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
peer.iHaveBudget = 0
let iwants = gossipSub.handleIHave(peer, @[msg])
check:
iwants.messageIDs.len == 0
block:
# given duplicate ihave should generate only one iwant
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
let id = @[0'u8, 1, 2, 3]
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
let iwants = gossipSub.handleIHave(peer, @[msg])
check:
iwants.messageIDs.len == 1
block:
# given duplicate iwant should generate only one message
# Define a new connection
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
# Add message to `gossipSub`'s message cache
let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message())
peer.sentIHaves[^1].incl(id)
# Build an IHAVE message that contains the same message ID three times
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
# Given the peer has no budget to request messages
peer.iHaveBudget = 0
# When a peer makes an IHAVE request for the a message that `gossipSub` has
let iwants = gossipSub.handleIHave(peer, @[msg])
# Then `gossipSub` should not generate an IWant message for the message,
check:
iwants.messageIDs.len == 0
# Peers with budget should request messages. If ids are repeated, only one request should be generated
block:
# Define a new connection
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
let id = @[0'u8, 1, 2, 3]
# Build an IHAVE message that contains the same message ID three times
let msg = ControlIHave(topicID: topic, messageIDs: @[id, id, id])
# Given the budget is not 0 (because it's not been overridden)
# When a peer makes an IHAVE request for the a message that `gossipSub` does not have
let iwants = gossipSub.handleIHave(peer, @[msg])
# Then `gossipSub` should generate an IWant message for the message
check:
iwants.messageIDs.len == 1
# Peers with budget should request messages. If ids are repeated, only one request should be generated
block:
# Define a new connection
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
# Add message to `gossipSub`'s message cache
let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message())
peer.sentIHaves[^1].incl(id)
# Build an IWANT message that contains the same message ID three times
let msg = ControlIWant(messageIDs: @[id, id, id])
# When a peer makes an IWANT request for the a message that `gossipSub` has
let genmsg = gossipSub.handleIWant(peer, @[msg])
# Then `gossipSub` should return the message
check:
genmsg.len == 1

View File

@@ -842,6 +842,8 @@ suite "Switch":
switch1.peerStore[AddressBook][switch2.peerInfo.peerId] == switch2.peerInfo.addrs
switch1.peerStore[ProtoBook][switch2.peerInfo.peerId] == switch2.peerInfo.protocols
switch1.peerStore[LastSeenBook][switch2.peerInfo.peerId].isSome()
switch1.peerInfo.peerId notin switch2.peerStore[AddressBook]
switch1.peerInfo.peerId notin switch2.peerStore[ProtoBook]