Compare commits

...

25 Commits

Author SHA1 Message Date
NagyZoltanPeter
8368c3f1dc simplify code upon review findings. 2025-02-20 09:08:51 +01:00
NagyZoltanPeter
313d0fc53b Return distinct publishing outcomes as error results for waku 2025-02-19 11:02:00 +01:00
NagyZoltanPeter
4f5a20630a upon latest libp2p release, 1.8.0
pubsub/gossipsub publish is refactored to easy tailoring without effecting interface.
pubsub observers are modified with distinch onBeforeSend and onAfterSent callbacks - this helps keep sending messages on fast path.
(This is needed for nwaku - that has no intent on modifying message before forwarding but still observs them)

Adapt pubsub test to changed obeservers
2025-02-18 12:33:24 +01:00
vladopajic
fc6ac07ce8 ci: utilize github action for nph check (#1257) 2025-02-17 15:42:58 +00:00
vladopajic
79cdc31b37 ci: remove commit message check (#1256) 2025-02-17 12:35:42 +00:00
vladopajic
be33ad6ac7 chore: specify raising exceptions in daemon module (#1249) 2025-02-14 15:36:29 +01:00
vladopajic
a6e45d6157 chore: list raised exceptions in utils module (#1252)
part of https://github.com/vacp2p/nim-libp2p/issues/962 effort.
2025-02-13 11:27:29 -04:00
richΛrd
37e0f61679 chore: update maintainers (#1248) 2025-02-11 14:32:12 -04:00
vladopajic
5d382b6423 style: fix code style with nph (#1246) 2025-02-11 15:39:08 +00:00
richΛrd
78a4344054 refactor(pubsub): do not raise exceptions on publish (#1244) 2025-02-07 01:30:21 +00:00
Ivan FB
a4f0a638e7 chore: add isClosedRemotely public bool attribute in lpstream (#1242)
Also adds the gcsafe to getStreams method so that we can invoke it from async procs
2025-02-06 09:54:00 -04:00
richΛrd
c5aa3736f9 chore(version): update libp2p.nimble to 1.8.0 (#1236)
This PR's commit is planned to be tagged as v1.8.0
2025-01-22 08:45:56 -04:00
richΛrd
b0f83fd48c fix: use target repository branch as suffix for nim-libp2p-auto-bump- (#1238)
The branch `nim-libp2p-auto-bump-unstable` was not getting autobumped
because at one point of time in nim-libp2p `unstable` branch was renamed
to `master` branch. Since the workflow file depended on `GITHUB_REF`, it
was instead updating `nim-libp2p-auto-bump-master` instead of
`nim-libp2p-auto-bump-unstable`
2025-01-15 14:25:21 -04:00
richΛrd
d6e5094095 fix(pubsub): revert async: raises: [] annotation for TopicHandler and ValidatorHandler (#1237) 2025-01-15 03:35:24 +00:00
richΛrd
483e1d91ba feat: idontwant on publish (#1230)
Closes #1224 

The following changes were done:
1. Once a message is broadcasted, if it exceeds the threeshold to
consider it a large message it will send an IDONTWANT message before
publishing the message

4f9c21f699/libp2p/protocols/pubsub/gossipsub.nim (L800-L801)
2. Extracts the logic to broadcast an IDONTWANT message and to verify
the size of a message into separate functions
3. Adds a template to filter values of a hashset without modifying the
original
2025-01-14 16:59:38 -04:00
richΛrd
d215bb21e0 fix(multiaddress): don't re-export minprotobuf (#1235) 2025-01-14 15:47:06 -04:00
richΛrd
61ac0c5b95 feat(pubsub): add {.async: (raises).} annotations (#1233)
This PR adds `{.async: (raises).}` annotations to the pubsub package.
The cases in which a `raises:[CatchableError]` was added were due to not
being part of the package and should probably be changed in a separate
PR
2025-01-14 17:01:02 +01:00
diegomrsantos
1fa30f07e8 chore(ci): add arm64 for macOS (#1212)
This PR adds the macOS 14 GitHub runner that uses the arm64 cpu.
2024-12-20 21:18:56 -04:00
richΛrd
39d0451a10 chore: validate PR titles and commits, and autoassign PRs (#1227) 2024-12-20 15:42:54 +01:00
richΛrd
4dc7a89f45 chore: use latest nimpng and nico (#1229) 2024-12-17 14:37:06 -04:00
richΛrd
fd26f93b80 fix(ci): use nim 2.0 branch (#1228) 2024-12-09 19:41:08 +00:00
Etan Kissling
dd2c74d413 feat(nameresolving): Add {.async: (raises).} annotations (#1214)
Modernize `nameresolving` modules to track `{.async: (raises).}`.

---------

Co-authored-by: kaiserd <1684595+kaiserd@users.noreply.github.com>
Co-authored-by: richΛrd <info@richardramos.me>
2024-12-09 12:43:21 +01:00
Eugene Kabanov
b7e0df127f Fix PeerStore missing remote endpoints of established connection. (#1226) 2024-11-27 14:49:41 +02:00
kaiserd
f591e692fc chore(version): update libp2p.nimble to 1.7.1 (#1223)
minor version for rendezvous fix
2024-11-09 10:51:33 +07:00
NagyZoltanPeter
8855bce085 fix:missing raises pragma for one of RendezVous.new (#1222) 2024-11-08 11:11:51 +01:00
63 changed files with 1034 additions and 575 deletions

View File

@@ -88,6 +88,8 @@ runs:
run: |
if [[ '${{ inputs.cpu }}' == 'amd64' ]]; then
PLATFORM=x64
elif [[ '${{ inputs.cpu }}' == 'arm64' ]]; then
PLATFORM=arm64
else
PLATFORM=x86
fi

12
.github/workflows/auto_assign_pr.yml vendored Normal file
View File

@@ -0,0 +1,12 @@
name: Auto Assign PR to Creator
on:
pull_request:
types:
- opened
jobs:
assign_creator:
runs-on: ubuntu-latest
steps:
- uses: toshimaru/auto-author-assign@v1.6.2

View File

@@ -27,15 +27,14 @@ jobs:
cpu: amd64
- os: macos
cpu: amd64
- os: macos-14
cpu: arm64
- os: windows
cpu: amd64
nim:
- ref: version-1-6
memory_management: refc
# The ref below corresponds to the branch "version-2-0".
# Right before an update from Nimble 0.16.1 to 0.16.2.
# That update breaks our dependency resolution.
- ref: 8754469f4947844c5938f56e1fba846c349354b5
- ref: version-2-0
memory_management: refc
include:
- platform:
@@ -50,6 +49,10 @@ jobs:
os: macos
builder: macos-13
shell: bash
- platform:
os: macos-14
builder: macos-14
shell: bash
- platform:
os: windows
builder: windows-2022
@@ -78,7 +81,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: '~1.15.5'
go-version: '~1.16.0' # That's the minimum Go version that works with arm.
- name: Install p2pd
run: |
@@ -90,8 +93,8 @@ jobs:
with:
path: nimbledeps
# Using nim.ref as a simple way to differentiate between nimble using the "pkgs" or "pkgs2" directories.
# The change happened on Nimble v0.14.0.
key: nimbledeps-${{ matrix.nim.ref }}-${{ hashFiles('.pinned') }} # hashFiles returns a different value on windows
# The change happened on Nimble v0.14.0. Also forcing the deps to be reinstalled on each os and cpu.
key: nimbledeps-${{ matrix.nim.ref }}-${{ matrix.builder }}-${{ matrix.platform.cpu }}-${{ hashFiles('.pinned') }} # hashFiles returns a different value on windows
- name: Install deps
if: ${{ steps.deps-cache.outputs.cache-hit != 'true' }}

View File

@@ -10,5 +10,5 @@ jobs:
name: Daily amd64
uses: ./.github/workflows/daily_common.yml
with:
nim: "[{'ref': 'version-1-6', 'memory_management': 'refc'}, {'ref': '8754469f4947844c5938f56e1fba846c349354b5', 'memory_management': 'refc'}]"
nim: "[{'ref': 'version-1-6', 'memory_management': 'refc'}, {'ref': 'version-2-0', 'memory_management': 'refc'}]"
cpu: "['amd64']"

View File

@@ -75,7 +75,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: '~1.15.5'
go-version: '~1.16.0'
cache: false
- name: Install p2pd

View File

@@ -10,6 +10,6 @@ jobs:
name: Daily i386 (Linux)
uses: ./.github/workflows/daily_common.yml
with:
nim: "[{'ref': 'version-1-6', 'memory_management': 'refc'}, {'ref': '8754469f4947844c5938f56e1fba846c349354b5', 'memory_management': 'refc'}, {'ref': 'devel', 'memory_management': 'orc'}]"
nim: "[{'ref': 'version-1-6', 'memory_management': 'refc'}, {'ref': 'version-2-0', 'memory_management': 'refc'}, {'ref': 'devel', 'memory_management': 'orc'}]"
cpu: "['i386']"
exclude: "[{'platform': {'os':'macos'}}, {'platform': {'os':'windows'}}]"

View File

@@ -10,6 +10,6 @@ jobs:
name: Daily SAT
uses: ./.github/workflows/daily_common.yml
with:
nim: "[{'ref': '8754469f4947844c5938f56e1fba846c349354b5', 'memory_management': 'refc'}]"
nim: "[{'ref': 'version-2-0', 'memory_management': 'refc'}]"
cpu: "['amd64']"
use_sat_solver: true

View File

@@ -44,7 +44,7 @@ jobs:
git config --global user.email "${{ github.actor }}@users.noreply.github.com"
git config --global user.name = "${{ github.actor }}"
git commit --allow-empty -a -m "auto-bump nim-libp2p"
git branch -D nim-libp2p-auto-bump-${GITHUB_REF##*/} || true
git switch -c nim-libp2p-auto-bump-${GITHUB_REF##*/}
git push -f origin nim-libp2p-auto-bump-${GITHUB_REF##*/}
git branch -D nim-libp2p-auto-bump-${{ matrix.target.ref }} || true
git switch -c nim-libp2p-auto-bump-${{ matrix.target.ref }}
git push -f origin nim-libp2p-auto-bump-${{ matrix.target.ref }}

View File

@@ -18,17 +18,10 @@ jobs:
with:
fetch-depth: 2 # In PR, has extra merge commit: ^1 = PR, ^2 = base
- name: Setup NPH
# Pin nph to a specific version to avoid sudden style differences.
# Updating nph version should be accompanied with running the new version on the fluffy directory.
run: |
VERSION="v0.5.1"
ARCHIVE="nph-linux_x64.tar.gz"
curl -L "https://github.com/arnetheduck/nph/releases/download/${VERSION}/${ARCHIVE}" -o ${ARCHIVE}
tar -xzf ${ARCHIVE}
- name: Check style
run: |
shopt -s extglob # Enable extended globbing
./nph examples libp2p tests tools *.@(nim|nims|nimble)
git diff --exit-code
- name: Check `nph` formatting
uses: arnetheduck/nph-action@v1
with:
version: 0.6.1
options: "examples libp2p tests tools *.nim*"
fail: true
suggest: true

35
.github/workflows/pr_lint.yml vendored Normal file
View File

@@ -0,0 +1,35 @@
name: "Conventional Commits"
on:
pull_request:
types:
- opened
- edited
- reopened
- synchronize
jobs:
main:
name: Validate PR title
runs-on: ubuntu-latest
permissions:
pull-requests: write
steps:
- uses: amannn/action-semantic-pull-request@v5
id: lint_pr_title
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- uses: marocchino/sticky-pull-request-comment@v2
# When the previous steps fails, the workflow would stop. By adding this
# condition you can continue the execution with the populated error message.
if: always() && (steps.lint_pr_title.outputs.error_message != null)
with:
header: pr-title-lint-error
message: |
Pull requests titles must follow the [Conventional Commits specification](https://www.conventionalcommits.org/en/v1.0.0/)
# Delete a previous comment when the issue has been resolved
- if: ${{ steps.lint_pr_title.outputs.error_message == null }}
uses: marocchino/sticky-pull-request-comment@v2
with:
header: pr-title-lint-error
delete: true

View File

@@ -145,9 +145,8 @@ The code follows the [Status Nim Style Guide](https://status-im.github.io/nim-st
<table>
<tbody>
<tr>
<td align="center"><a href="https://github.com/Menduist"><img src="https://avatars.githubusercontent.com/u/13471753?v=4?s=100" width="100px;" alt="Tanguy"/><br /><sub><b>Tanguy (Menduist)</b></sub></a></td>
<td align="center"><a href="https://github.com/lchenut"><img src="https://avatars.githubusercontent.com/u/11214565?v=4?s=100" width="100px;" alt="Ludovic"/><br /><sub><b>Ludovic</b></sub></a></td>
<td align="center"><a href="https://github.com/diegomrsantos"><img src="https://avatars.githubusercontent.com/u/7316595?v=4?s=100" width="100px;" alt="Diego"/><br /><sub><b>Diego</b></sub></a></td>
<td align="center"><a href="https://github.com/richard-ramos"><img src="https://avatars.githubusercontent.com/u/1106587?v=4?s=100" width="100px;" alt="Richard"/><br /><sub><b>Richard</b></sub></a></td>
<td align="center"><a href="https://github.com/vladopajic"><img src="https://avatars.githubusercontent.com/u/4353513?v=4?s=100" width="100px;" alt="Vlado"/><br /><sub><b>Vlado</b></sub></a></td>
</tr>
</tbody>
</table>

View File

@@ -13,7 +13,7 @@ For more information about the go daemon, check out [this repository](https://gi
> **Required only** for running the tests.
# Prerequisites
Go with version `1.15.15`.
Go with version `1.16.0`.
> You will *likely* be able to build `go-libp2p-daemon` with different Go versions, but **they haven't been tested**.
# Installation
@@ -21,7 +21,7 @@ Follow one of the methods below:
## Script
Run the build script while having the `go` command pointing to the correct Go version.
We recommend using `1.15.15`, as previously stated.
We recommend using `1.16.0`, as previously stated.
```sh
./scripts/build_p2pd.sh
```

View File

@@ -79,8 +79,7 @@ proc oneNode(node: Node, rng: ref HmacDrbgContext) {.async.} =
let decoded = MetricList.decode(message.data)
if decoded.isErr:
return ValidationResult.Reject
return ValidationResult.Accept
,
return ValidationResult.Accept,
)
# This "validator" will attach to the `metrics` topic and make sure
# that every message in this topic is valid. This allows us to stop
@@ -93,7 +92,8 @@ proc oneNode(node: Node, rng: ref HmacDrbgContext) {.async.} =
node.gossip.subscribe(
"metrics",
proc(topic: string, data: seq[byte]) {.async.} =
echo MetricList.decode(data).tryGet()
let m = MetricList.decode(data).expect("metric can be decoded")
echo m
,
)
else:

View File

@@ -214,8 +214,7 @@ proc networking(g: Game) {.async.} =
# We are "player 2"
swap(g.localPlayer, g.remotePlayer)
except CatchableError as exc:
discard
,
discard,
)
await switch.start()
@@ -268,14 +267,11 @@ nico.init("Status", "Tron")
nico.createWindow("Tron", mapSize * 4, mapSize * 4, 4, false)
nico.run(
proc() =
discard
,
discard,
proc(dt: float32) =
game.update(dt)
,
game.update(dt),
proc() =
game.draw()
,
game.draw(),
)
waitFor(netFut.cancelAndWait())

View File

@@ -1,7 +1,7 @@
mode = ScriptMode.Verbose
packageName = "libp2p"
version = "1.7.0"
version = "1.8.0"
author = "Status Research & Development GmbH"
description = "LibP2P implementation"
license = "MIT"
@@ -125,9 +125,8 @@ task examples_build, "Build the samples":
buildSample("tutorial_3_protobuf", true)
buildSample("tutorial_4_gossipsub", true)
buildSample("tutorial_5_discovery", true)
exec "nimble install -y nimpng@#HEAD"
# this is to fix broken build on 1.7.3, remove it when nimpng version 0.3.2 or later is released
exec "nimble install -y nico@#af99dd60bf2b395038ece815ea1012330a80d6e6"
exec "nimble install -y nimpng"
exec "nimble install -y nico --passNim=--skipParentCfg"
buildSample("tutorial_6_game", false, "--styleCheck:off")
# pin system

View File

@@ -28,8 +28,7 @@ proc hkdf*[T: sha256, len: static int](
if salt.len > 0:
unsafeAddr salt[0]
else:
nil
,
nil,
csize_t(salt.len),
)
hkdfInject(
@@ -37,8 +36,7 @@ proc hkdf*[T: sha256, len: static int](
if ikm.len > 0:
unsafeAddr ikm[0]
else:
nil
,
nil,
csize_t(ikm.len),
)
hkdfFlip(ctx)
@@ -48,8 +46,7 @@ proc hkdf*[T: sha256, len: static int](
if info.len > 0:
unsafeAddr info[0]
else:
nil
,
nil,
csize_t(info.len),
addr outputs[i][0],
csize_t(outputs[i].len),

View File

@@ -158,11 +158,11 @@ type
key*: PublicKey
P2PStreamCallback* = proc(api: DaemonAPI, stream: P2PStream): Future[void] {.
gcsafe, raises: [CatchableError]
gcsafe, async: (raises: [CatchableError])
.}
P2PPubSubCallback* = proc(
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
): Future[bool] {.gcsafe, raises: [CatchableError].}
): Future[bool] {.gcsafe, async: (raises: [CatchableError]).}
DaemonError* = object of LPError
DaemonRemoteError* = object of DaemonError
@@ -485,7 +485,11 @@ proc getErrorMessage(pb: ProtoBuffer): string {.inline, raises: [DaemonLocalErro
if initProtoBuffer(error).getRequiredField(1, result).isErr():
raise newException(DaemonLocalError, "Error message is missing!")
proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} =
proc recvMessage(
conn: StreamTransport
): Future[seq[byte]] {.
async: (raises: [TransportIncompleteError, TransportError, CancelledError])
.} =
var
size: uint
length: int
@@ -508,13 +512,19 @@ proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} =
result = buffer
proc newConnection*(api: DaemonAPI): Future[StreamTransport] {.raises: [LPError].} =
result = connect(api.address)
proc newConnection*(
api: DaemonAPI
): Future[StreamTransport] {.
async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError])
.} =
await connect(api.address)
proc closeConnection*(api: DaemonAPI, transp: StreamTransport): Future[void] =
result = transp.closeWait()
proc closeConnection*(
api: DaemonAPI, transp: StreamTransport
): Future[void] {.async: (raises: [CancelledError]).} =
await transp.closeWait()
proc socketExists(address: MultiAddress): Future[bool] {.async.} =
proc socketExists(address: MultiAddress): Future[bool] {.async: (raises: []).} =
try:
var transp = await connect(address)
await transp.closeWait()
@@ -534,7 +544,9 @@ else:
proc getProcessId(): int =
result = int(posix.getpid())
proc getSocket(pattern: string, count: ptr int): Future[MultiAddress] {.async.} =
proc getSocket(
pattern: string, count: ptr int
): Future[MultiAddress] {.async: (raises: [ValueError, LPError]).} =
var sockname = ""
var pid = $getProcessId()
sockname = pattern % [pid, $(count[])]
@@ -562,7 +574,35 @@ proc getSocket(pattern: string, count: ptr int): Future[MultiAddress] {.async.}
closeSocket(sock)
# This is forward declaration needed for newDaemonApi()
proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.}
proc listPeers*(
api: DaemonAPI
): Future[seq[PeerInfo]] {.
async: (
raises: [
ValueError, DaemonLocalError, OSError, MaInvalidAddress, TransportError,
CancelledError, LPError,
]
)
.}
template exceptionToAssert(body: untyped): untyped =
block:
var res: type(body)
when defined(nimHasWarnBareExcept):
{.push warning[BareExcept]: off.}
try:
res = body
except OSError as exc:
raise exc
except IOError as exc:
raise exc
except Defect as exc:
raise exc
except Exception as exc:
raiseAssert exc.msg
when defined(nimHasWarnBareExcept):
{.pop.}
res
proc copyEnv(): StringTableRef =
## This procedure copy all environment variables into StringTable.
@@ -586,7 +626,14 @@ proc newDaemonApi*(
peersRequired = 2,
logFile = "",
logLevel = IpfsLogLevel.Debug,
): Future[DaemonAPI] {.async.} =
): Future[DaemonAPI] {.
async: (
raises: [
ValueError, DaemonLocalError, CancelledError, LPError, OSError, IOError,
AsyncError,
]
)
.} =
## Initialize connection to `go-libp2p-daemon` control socket.
##
## ``flags`` - set of P2PDaemonFlags.
@@ -780,7 +827,7 @@ proc newDaemonApi*(
result = api
proc close*(stream: P2PStream) {.async.} =
proc close*(stream: P2PStream) {.async: (raises: [DaemonLocalError]).} =
## Close ``stream``.
if P2PStreamFlags.Closed notin stream.flags:
await stream.transp.closeWait()
@@ -789,7 +836,9 @@ proc close*(stream: P2PStream) {.async.} =
else:
raise newException(DaemonLocalError, "Stream is already closed!")
proc close*(api: DaemonAPI) {.async.} =
proc close*(
api: DaemonAPI
) {.async: (raises: [TransportOsError, LPError, ValueError, OSError, CancelledError]).} =
## Shutdown connections to `go-libp2p-daemon` control socket.
# await api.pool.close()
# Closing all pending servers.
@@ -827,7 +876,9 @@ template withMessage(m, body: untyped): untyped =
proc transactMessage(
transp: StreamTransport, pb: ProtoBuffer
): Future[ProtoBuffer] {.async.} =
): Future[ProtoBuffer] {.
async: (raises: [DaemonLocalError, TransportError, CancelledError])
.} =
let length = pb.getLen()
let res = await transp.write(pb.getPtr(), length)
if res != length:
@@ -845,7 +896,11 @@ proc getPeerInfo(pb: ProtoBuffer): PeerInfo {.raises: [DaemonLocalError].} =
discard pb.getRepeatedField(2, result.addresses)
proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} =
proc identity*(
api: DaemonAPI
): Future[PeerInfo] {.
async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError])
.} =
## Get Node identity information
var transp = await api.newConnection()
try:
@@ -860,7 +915,7 @@ proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} =
proc connect*(
api: DaemonAPI, peer: PeerId, addresses: seq[MultiAddress], timeout = 0
) {.async.} =
) {.async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError]).} =
## Connect to remote peer with id ``peer`` and addresses ``addresses``.
var transp = await api.newConnection()
try:
@@ -870,7 +925,9 @@ proc connect*(
except CatchableError:
await api.closeConnection(transp)
proc disconnect*(api: DaemonAPI, peer: PeerId) {.async.} =
proc disconnect*(
api: DaemonAPI, peer: PeerId
) {.async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError]).} =
## Disconnect from remote peer with id ``peer``.
var transp = await api.newConnection()
try:
@@ -882,7 +939,12 @@ proc disconnect*(api: DaemonAPI, peer: PeerId) {.async.} =
proc openStream*(
api: DaemonAPI, peer: PeerId, protocols: seq[string], timeout = 0
): Future[P2PStream] {.async.} =
): Future[P2PStream] {.
async: (
raises:
[MaInvalidAddress, TransportError, CancelledError, LPError, DaemonLocalError]
)
.} =
## Open new stream to peer ``peer`` using one of the protocols in
## ``protocols``. Returns ``StreamTransport`` for the stream.
var transp = await api.newConnection()
@@ -903,9 +965,9 @@ proc openStream*(
stream.flags.incl(Outbound)
stream.transp = transp
result = stream
except CatchableError as exc:
except ResultError[ProtoError]:
await api.closeConnection(transp)
raise exc
raise newException(DaemonLocalError, "Wrong message type!")
proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} =
var api = getUserData[DaemonAPI](server)
@@ -927,11 +989,28 @@ proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} =
proc addHandler*(
api: DaemonAPI, protocols: seq[string], handler: P2PStreamCallback
) {.async, raises: [LPError].} =
) {.
async: (
raises: [
MaInvalidAddress, DaemonLocalError, TransportError, CancelledError, LPError,
ValueError,
]
)
.} =
## Add stream handler ``handler`` for set of protocols ``protocols``.
var transp = await api.newConnection()
let maddress = await getSocket(api.pattern, addr api.ucounter)
var server = createStreamServer(maddress, streamHandler, udata = api)
var removeHandler = proc(): Future[void] {.
async: (raises: [CancelledError, TransportError])
.} =
for item in protocols:
api.handlers.del(item)
server.stop()
server.close()
await server.join()
try:
for item in protocols:
api.handlers[item] = handler
@@ -939,17 +1018,28 @@ proc addHandler*(
var pb = await transp.transactMessage(requestStreamHandler(maddress, protocols))
pb.withMessage:
api.servers.add(P2PServer(server: server, address: maddress))
except CatchableError as exc:
for item in protocols:
api.handlers.del(item)
server.stop()
server.close()
await server.join()
raise exc
except DaemonLocalError as e:
await removeHandler()
raise e
except TransportError as e:
await removeHandler()
raise e
except CancelledError as e:
await removeHandler()
raise e
finally:
await api.closeConnection(transp)
proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} =
proc listPeers*(
api: DaemonAPI
): Future[seq[PeerInfo]] {.
async: (
raises: [
ValueError, DaemonLocalError, OSError, MaInvalidAddress, TransportError,
CancelledError, LPError,
]
)
.} =
## Get list of remote peers to which we are currently connected.
var transp = await api.newConnection()
try:
@@ -964,7 +1054,14 @@ proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} =
finally:
await api.closeConnection(transp)
proc cmTagPeer*(api: DaemonAPI, peer: PeerId, tag: string, weight: int) {.async.} =
proc cmTagPeer*(
api: DaemonAPI, peer: PeerId, tag: string, weight: int
) {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Tag peer with id ``peer`` using ``tag`` and ``weight``.
var transp = await api.newConnection()
try:
@@ -974,7 +1071,14 @@ proc cmTagPeer*(api: DaemonAPI, peer: PeerId, tag: string, weight: int) {.async.
finally:
await api.closeConnection(transp)
proc cmUntagPeer*(api: DaemonAPI, peer: PeerId, tag: string) {.async.} =
proc cmUntagPeer*(
api: DaemonAPI, peer: PeerId, tag: string
) {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Remove tag ``tag`` from peer with id ``peer``.
var transp = await api.newConnection()
try:
@@ -984,7 +1088,14 @@ proc cmUntagPeer*(api: DaemonAPI, peer: PeerId, tag: string) {.async.} =
finally:
await api.closeConnection(transp)
proc cmTrimPeers*(api: DaemonAPI) {.async.} =
proc cmTrimPeers*(
api: DaemonAPI
) {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Trim all connections.
var transp = await api.newConnection()
try:
@@ -1058,7 +1169,12 @@ proc getDhtMessageType(
proc dhtFindPeer*(
api: DaemonAPI, peer: PeerId, timeout = 0
): Future[PeerInfo] {.async.} =
): Future[PeerInfo] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Find peer with id ``peer`` and return peer information ``PeerInfo``.
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@@ -1073,7 +1189,12 @@ proc dhtFindPeer*(
proc dhtGetPublicKey*(
api: DaemonAPI, peer: PeerId, timeout = 0
): Future[PublicKey] {.async.} =
): Future[PublicKey] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Get peer's public key from peer with id ``peer``.
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@@ -1088,7 +1209,12 @@ proc dhtGetPublicKey*(
proc dhtGetValue*(
api: DaemonAPI, key: string, timeout = 0
): Future[seq[byte]] {.async.} =
): Future[seq[byte]] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Get value associated with ``key``.
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@@ -1103,7 +1229,12 @@ proc dhtGetValue*(
proc dhtPutValue*(
api: DaemonAPI, key: string, value: seq[byte], timeout = 0
) {.async.} =
) {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Associate ``value`` with ``key``.
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@@ -1116,7 +1247,14 @@ proc dhtPutValue*(
finally:
await api.closeConnection(transp)
proc dhtProvide*(api: DaemonAPI, cid: Cid, timeout = 0) {.async.} =
proc dhtProvide*(
api: DaemonAPI, cid: Cid, timeout = 0
) {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Provide content with id ``cid``.
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@@ -1131,7 +1269,12 @@ proc dhtProvide*(api: DaemonAPI, cid: Cid, timeout = 0) {.async.} =
proc dhtFindPeersConnectedToPeer*(
api: DaemonAPI, peer: PeerId, timeout = 0
): Future[seq[PeerInfo]] {.async.} =
): Future[seq[PeerInfo]] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Find peers which are connected to peer with id ``peer``.
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@@ -1157,7 +1300,12 @@ proc dhtFindPeersConnectedToPeer*(
proc dhtGetClosestPeers*(
api: DaemonAPI, key: string, timeout = 0
): Future[seq[PeerId]] {.async.} =
): Future[seq[PeerId]] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Get closest peers for ``key``.
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@@ -1183,7 +1331,12 @@ proc dhtGetClosestPeers*(
proc dhtFindProviders*(
api: DaemonAPI, cid: Cid, count: uint32, timeout = 0
): Future[seq[PeerInfo]] {.async.} =
): Future[seq[PeerInfo]] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Get ``count`` providers for content with id ``cid``.
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@@ -1209,7 +1362,12 @@ proc dhtFindProviders*(
proc dhtSearchValue*(
api: DaemonAPI, key: string, timeout = 0
): Future[seq[seq[byte]]] {.async.} =
): Future[seq[seq[byte]]] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Search for value with ``key``, return list of values found.
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@@ -1232,7 +1390,14 @@ proc dhtSearchValue*(
finally:
await api.closeConnection(transp)
proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} =
proc pubsubGetTopics*(
api: DaemonAPI
): Future[seq[string]] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Get list of topics this node is subscribed to.
var transp = await api.newConnection()
try:
@@ -1245,7 +1410,14 @@ proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} =
finally:
await api.closeConnection(transp)
proc pubsubListPeers*(api: DaemonAPI, topic: string): Future[seq[PeerId]] {.async.} =
proc pubsubListPeers*(
api: DaemonAPI, topic: string
): Future[seq[PeerId]] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Get list of peers we are connected to and which also subscribed to topic
## ``topic``.
var transp = await api.newConnection()
@@ -1260,7 +1432,14 @@ proc pubsubListPeers*(api: DaemonAPI, topic: string): Future[seq[PeerId]] {.asyn
finally:
await api.closeConnection(transp)
proc pubsubPublish*(api: DaemonAPI, topic: string, value: seq[byte]) {.async.} =
proc pubsubPublish*(
api: DaemonAPI, topic: string, value: seq[byte]
) {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Get list of peer identifiers which are subscribed to topic ``topic``.
var transp = await api.newConnection()
try:
@@ -1280,7 +1459,13 @@ proc getPubsubMessage*(pb: ProtoBuffer): PubSubMessage =
discard pb.getField(5, result.signature)
discard pb.getField(6, result.key)
proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} =
proc pubsubLoop(
api: DaemonAPI, ticket: PubsubTicket
) {.
async: (
raises: [TransportIncompleteError, TransportError, CancelledError, CatchableError]
)
.} =
while true:
var pbmessage = await ticket.transp.recvMessage()
if len(pbmessage) == 0:
@@ -1296,7 +1481,12 @@ proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} =
proc pubsubSubscribe*(
api: DaemonAPI, topic: string, handler: P2PPubSubCallback
): Future[PubsubTicket] {.async.} =
): Future[PubsubTicket] {.
async: (
raises:
[MaInvalidAddress, TransportError, LPError, CancelledError, DaemonLocalError]
)
.} =
## Subscribe to topic ``topic``.
var transp = await api.newConnection()
try:
@@ -1308,7 +1498,13 @@ proc pubsubSubscribe*(
ticket.transp = transp
asyncSpawn pubsubLoop(api, ticket)
result = ticket
except CatchableError as exc:
except DaemonLocalError as exc:
await api.closeConnection(transp)
raise exc
except TransportError as exc:
await api.closeConnection(transp)
raise exc
except CancelledError as exc:
await api.closeConnection(transp)
raise exc

View File

@@ -55,7 +55,7 @@ proc newPool*(
address: TransportAddress,
poolsize: int = DefaultPoolSize,
bufferSize = DefaultStreamBufferSize,
): Future[TransportPool] {.async.} =
): Future[TransportPool] {.async: (raises: [CancelledError]).} =
## Establish pool of connections to address ``address`` with size
## ``poolsize``.
var pool = new TransportPool
@@ -80,7 +80,9 @@ proc newPool*(
pool.state = Connected
result = pool
proc acquire*(pool: TransportPool): Future[StreamTransport] {.async.} =
proc acquire*(
pool: TransportPool
): Future[StreamTransport] {.async: (raises: [CancelledError, TransportPoolError]).} =
## Acquire non-busy connection from pool ``pool``.
var transp: StreamTransport
if pool.state in {Connected}:
@@ -102,7 +104,9 @@ proc acquire*(pool: TransportPool): Future[StreamTransport] {.async.} =
raise newException(TransportPoolError, "Pool is not ready!")
result = transp
proc release*(pool: TransportPool, transp: StreamTransport) =
proc release*(
pool: TransportPool, transp: StreamTransport
) {.async: (raises: [TransportPoolError]).} =
## Release connection ``transp`` back to pool ``pool``.
if pool.state in {Connected, Closing}:
var found = false
@@ -118,7 +122,9 @@ proc release*(pool: TransportPool, transp: StreamTransport) =
else:
raise newException(TransportPoolError, "Pool is not ready!")
proc join*(pool: TransportPool) {.async.} =
proc join*(
pool: TransportPool
) {.async: (raises: [TransportPoolError, CancelledError]).} =
## Waiting for all connection to become available.
if pool.state in {Connected, Closing}:
while true:
@@ -130,7 +136,9 @@ proc join*(pool: TransportPool) {.async.} =
elif pool.state == Connecting:
raise newException(TransportPoolError, "Pool is not ready!")
proc close*(pool: TransportPool) {.async.} =
proc close*(
pool: TransportPool
) {.async: (raises: [TransportPoolError, CancelledError]).} =
## Closes transports pool ``pool`` and release all resources.
if pool.state == Connected:
pool.state = Closing

View File

@@ -38,8 +38,7 @@ proc add*[T](pa: var PeerAttributes, value: T) =
Attribute[T](
value: value,
comparator: proc(f: BaseAttr, c: BaseAttr): bool =
f.ofType(T) and c.ofType(T) and f.to(T) == c.to(T)
,
f.ofType(T) and c.ofType(T) and f.to(T) == c.to(T),
)
)

View File

@@ -26,7 +26,7 @@ import
errors,
utility
import stew/[base58, base32, endians2]
export results, minprotobuf, vbuffer, errors, utility
export results, vbuffer, errors, utility
logScope:
topics = "libp2p multiaddress"

View File

@@ -247,7 +247,7 @@ method close*(m: Mplex) {.async: (raises: []).} =
trace "Closed mplex", m
method getStreams*(m: Mplex): seq[Connection] =
method getStreams*(m: Mplex): seq[Connection] {.gcsafe.} =
for c in m.channels[false].values:
result.add(c)
for c in m.channels[true].values:

View File

@@ -67,5 +67,5 @@ proc new*(
let muxerProvider = T(newMuxer: creator, codec: codec)
muxerProvider
method getStreams*(m: Muxer): seq[Connection] {.base.} =
method getStreams*(m: Muxer): seq[Connection] {.base, gcsafe.} =
raiseAssert("Not implemented!")

View File

@@ -82,8 +82,7 @@ proc `$`(header: YamuxHeader): string =
if a != "":
a & ", " & $b
else:
$b
,
$b,
"",
) & "}, " & "streamId: " & $header.streamId & ", " & "length: " & $header.length &
"}"
@@ -176,8 +175,7 @@ proc `$`(channel: YamuxChannel): string =
if a != "":
a & ", " & b
else:
b
,
b,
"",
) & "}"
@@ -205,6 +203,7 @@ proc remoteClosed(channel: YamuxChannel) {.async: (raises: []).} =
if not channel.closedRemotely.isSet():
channel.closedRemotely.fire()
await channel.actuallyClose()
channel.isClosedRemotely = true
method closeImpl*(channel: YamuxChannel) {.async: (raises: []).} =
if not channel.closedLocally:
@@ -632,7 +631,7 @@ method handle*(m: Yamux) {.async: (raises: []).} =
await m.close()
trace "Stopped yamux handler"
method getStreams*(m: Yamux): seq[Connection] =
method getStreams*(m: Yamux): seq[Connection] {.gcsafe.} =
for c in m.channels.values:
result.add(c)

View File

@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -10,7 +10,7 @@
{.push raises: [].}
import
std/[streams, strutils, sets, sequtils],
std/[streams, sets, sequtils],
chronos,
chronicles,
stew/byteutils,
@@ -39,24 +39,32 @@ proc questionToBuf(address: string, kind: QKind): seq[byte] =
var buf = newSeq[byte](dataLen)
discard requestStream.readData(addr buf[0], dataLen)
return buf
except CatchableError as exc:
buf
except IOError as exc:
info "Failed to created DNS buffer", description = exc.msg
return newSeq[byte](0)
newSeq[byte](0)
except OSError as exc:
info "Failed to created DNS buffer", description = exc.msg
newSeq[byte](0)
except ValueError as exc:
info "Failed to created DNS buffer", description = exc.msg
newSeq[byte](0)
proc getDnsResponse(
dnsServer: TransportAddress, address: string, kind: QKind
): Future[Response] {.async.} =
): Future[Response] {.
async: (raises: [CancelledError, IOError, OSError, TransportError, ValueError])
.} =
var sendBuf = questionToBuf(address, kind)
if sendBuf.len == 0:
raise newException(ValueError, "Incorrect DNS query")
let receivedDataFuture = newFuture[void]()
let receivedDataFuture = Future[void].Raising([CancelledError]).init()
proc datagramDataReceived(
transp: DatagramTransport, raddr: TransportAddress
): Future[void] {.async, closure.} =
): Future[void] {.async: (raises: []), closure.} =
receivedDataFuture.complete()
let sock =
@@ -68,27 +76,41 @@ proc getDnsResponse(
try:
await sock.sendTo(dnsServer, addr sendBuf[0], sendBuf.len)
await receivedDataFuture or sleepAsync(5.seconds) #unix default
if not receivedDataFuture.finished:
try:
await receivedDataFuture.wait(5.seconds) #unix default
except AsyncTimeoutError:
raise newException(IOError, "DNS server timeout")
let rawResponse = sock.getMessage()
# parseResponse can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
return exceptionToAssert:
try:
parseResponse(string.fromBytes(rawResponse))
except IOError as exc:
raise exc
except OSError as exc:
raise exc
except ValueError as exc:
raise exc
except Exception as exc:
# Nim 1.6: parseResponse can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
raiseAssert exc.msg
finally:
await sock.closeWait()
method resolveIp*(
self: DnsResolver, address: string, port: Port, domain: Domain = Domain.AF_UNSPEC
): Future[seq[TransportAddress]] {.async.} =
): Future[seq[TransportAddress]] {.
async: (raises: [CancelledError, TransportAddressError])
.} =
trace "Resolving IP using DNS", address, servers = self.nameServers.mapIt($it), domain
for _ in 0 ..< self.nameServers.len:
let server = self.nameServers[0]
var responseFutures: seq[Future[Response]]
var responseFutures: seq[
Future[Response].Raising(
[CancelledError, IOError, OSError, TransportError, ValueError]
)
]
if domain == Domain.AF_INET or domain == Domain.AF_UNSPEC:
responseFutures.add(getDnsResponse(server, address, A))
@@ -103,23 +125,32 @@ method resolveIp*(
var
resolvedAddresses: OrderedSet[string]
resolveFailed = false
template handleFail(e): untyped =
info "Failed to query DNS", address, error = e.msg
resolveFailed = true
break
for fut in responseFutures:
try:
let resp = await fut
for answer in resp.answers:
# toString can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
resolvedAddresses.incl(exceptionToAssert(answer.toString()))
resolvedAddresses.incl(answer.toString())
except CancelledError as e:
raise e
except ValueError as e:
info "Invalid DNS query", address, error = e.msg
return @[]
except CatchableError as e:
info "Failed to query DNS", address, error = e.msg
resolveFailed = true
break
except IOError as e:
handleFail(e)
except OSError as e:
handleFail(e)
except TransportError as e:
handleFail(e)
except Exception as e:
# Nim 1.6: answer.toString can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
raiseAssert e.msg
if resolveFailed:
self.nameServers.add(self.nameServers[0])
@@ -132,27 +163,39 @@ method resolveIp*(
debug "Failed to resolve address, returning empty set"
return @[]
method resolveTxt*(self: DnsResolver, address: string): Future[seq[string]] {.async.} =
method resolveTxt*(
self: DnsResolver, address: string
): Future[seq[string]] {.async: (raises: [CancelledError]).} =
trace "Resolving TXT using DNS", address, servers = self.nameServers.mapIt($it)
for _ in 0 ..< self.nameServers.len:
let server = self.nameServers[0]
try:
# toString can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
let response = await getDnsResponse(server, address, TXT)
return exceptionToAssert:
trace "Got TXT response",
server = $server, answer = response.answers.mapIt(it.toString())
response.answers.mapIt(it.toString())
except CancelledError as e:
raise e
except CatchableError as e:
template handleFail(e): untyped =
info "Failed to query DNS", address, error = e.msg
self.nameServers.add(self.nameServers[0])
self.nameServers.delete(0)
continue
try:
let response = await getDnsResponse(server, address, TXT)
trace "Got TXT response",
server = $server, answer = response.answers.mapIt(it.toString())
return response.answers.mapIt(it.toString())
except CancelledError as e:
raise e
except IOError as e:
handleFail(e)
except OSError as e:
handleFail(e)
except TransportError as e:
handleFail(e)
except ValueError as e:
handleFail(e)
except Exception as e:
# Nim 1.6: toString can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
raiseAssert e.msg
debug "Failed to resolve TXT, returning empty set"
return @[]

View File

@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -25,17 +25,25 @@ type MockResolver* = ref object of NameResolver
method resolveIp*(
self: MockResolver, address: string, port: Port, domain: Domain = Domain.AF_UNSPEC
): Future[seq[TransportAddress]] {.async.} =
): Future[seq[TransportAddress]] {.
async: (raises: [CancelledError, TransportAddressError])
.} =
var res: seq[TransportAddress]
if domain == Domain.AF_INET or domain == Domain.AF_UNSPEC:
for resp in self.ipResponses.getOrDefault((address, false)):
result.add(initTAddress(resp, port))
res.add(initTAddress(resp, port))
if domain == Domain.AF_INET6 or domain == Domain.AF_UNSPEC:
for resp in self.ipResponses.getOrDefault((address, true)):
result.add(initTAddress(resp, port))
res.add(initTAddress(resp, port))
method resolveTxt*(self: MockResolver, address: string): Future[seq[string]] {.async.} =
return self.txtResponses.getOrDefault(address)
res
method resolveTxt*(
self: MockResolver, address: string
): Future[seq[string]] {.async: (raises: [CancelledError]).} =
self.txtResponses.getOrDefault(address)
proc new*(T: typedesc[MockResolver]): T =
T()

View File

@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -9,7 +9,7 @@
{.push raises: [].}
import std/[sugar, sets, sequtils, strutils]
import std/[sets, sequtils, strutils]
import chronos, chronicles, stew/endians2
import ".."/[multiaddress, multicodec]
@@ -20,19 +20,17 @@ type NameResolver* = ref object of RootObj
method resolveTxt*(
self: NameResolver, address: string
): Future[seq[string]] {.async, base.} =
): Future[seq[string]] {.async: (raises: [CancelledError]), base.} =
## Get TXT record
##
doAssert(false, "Not implemented!")
raiseAssert "Not implemented!"
method resolveIp*(
self: NameResolver, address: string, port: Port, domain: Domain = Domain.AF_UNSPEC
): Future[seq[TransportAddress]] {.async, base.} =
): Future[seq[TransportAddress]] {.
async: (raises: [CancelledError, TransportAddressError]), base
.} =
## Resolve the specified address
##
doAssert(false, "Not implemented!")
raiseAssert "Not implemented!"
proc getHostname*(ma: MultiAddress): string =
let
@@ -46,30 +44,40 @@ proc getHostname*(ma: MultiAddress): string =
proc resolveOneAddress(
self: NameResolver, ma: MultiAddress, domain: Domain = Domain.AF_UNSPEC, prefix = ""
): Future[seq[MultiAddress]] {.async.} =
#Resolve a single address
): Future[seq[MultiAddress]] {.
async: (raises: [CancelledError, MaError, TransportAddressError])
.} =
# Resolve a single address
let portPart = ma[1].valueOr:
raise maErr error
var pbuf: array[2, byte]
var dnsval = getHostname(ma)
if ma[1].tryGet().protoArgument(pbuf).tryGet() == 0:
raise newException(MaError, "Incorrect port number")
let plen = portPart.protoArgument(pbuf).valueOr:
raise maErr error
if plen == 0:
raise maErr "Incorrect port number"
let
port = Port(fromBytesBE(uint16, pbuf))
dnsval = getHostname(ma)
resolvedAddresses = await self.resolveIp(prefix & dnsval, port, domain)
return collect(newSeqOfCap(4)):
for address in resolvedAddresses:
var createdAddress = MultiAddress.init(address).tryGet()[0].tryGet()
for part in ma:
if DNS.match(part.tryGet()):
continue
createdAddress &= part.tryGet()
createdAddress
resolvedAddresses.mapIt:
let address = MultiAddress.init(it).valueOr:
raise maErr error
var createdAddress = address[0].valueOr:
raise maErr error
for part in ma:
let part = part.valueOr:
raise maErr error
if DNS.match(part):
continue
createdAddress &= part
createdAddress
proc resolveDnsAddr*(
self: NameResolver, ma: MultiAddress, depth: int = 0
): Future[seq[MultiAddress]] {.async.} =
): Future[seq[MultiAddress]] {.
async: (raises: [CancelledError, MaError, TransportAddressError])
.} =
if not DNSADDR.matchPartial(ma):
return @[ma]
@@ -78,54 +86,67 @@ proc resolveDnsAddr*(
info "Stopping DNSADDR recursion, probably malicious", ma
return @[]
var dnsval = getHostname(ma)
let txt = await self.resolveTxt("_dnsaddr." & dnsval)
let
dnsval = getHostname(ma)
txt = await self.resolveTxt("_dnsaddr." & dnsval)
trace "txt entries", txt
var result: seq[MultiAddress]
const codec = multiCodec("p2p")
let maCodec = block:
let hasCodec = ma.contains(codec).valueOr:
raise maErr error
if hasCodec:
ma[codec]
else:
(static(default(MaResult[MultiAddress])))
var res: seq[MultiAddress]
for entry in txt:
if not entry.startsWith("dnsaddr="):
continue
let entryValue = MultiAddress.init(entry[8 ..^ 1]).tryGet()
if entryValue.contains(multiCodec("p2p")).tryGet() and
ma.contains(multiCodec("p2p")).tryGet():
if entryValue[multiCodec("p2p")] != ma[multiCodec("p2p")]:
continue
let
entryValue = MultiAddress.init(entry[8 ..^ 1]).valueOr:
raise maErr error
entryHasCodec = entryValue.contains(multiCodec("p2p")).valueOr:
raise maErr error
if entryHasCodec and maCodec.isOk and entryValue[codec] != maCodec:
continue
let resolved = await self.resolveDnsAddr(entryValue, depth + 1)
for r in resolved:
result.add(r)
res.add(r)
if result.len == 0:
if res.len == 0:
debug "Failed to resolve a DNSADDR", ma
return @[]
return result
res
proc resolveMAddress*(
self: NameResolver, address: MultiAddress
): Future[seq[MultiAddress]] {.async.} =
): Future[seq[MultiAddress]] {.
async: (raises: [CancelledError, MaError, TransportAddressError])
.} =
var res = initOrderedSet[MultiAddress]()
if not DNS.matchPartial(address):
res.incl(address)
else:
let code = address[0].tryGet().protoCode().tryGet()
let seq =
case code
of multiCodec("dns"):
await self.resolveOneAddress(address)
of multiCodec("dns4"):
await self.resolveOneAddress(address, Domain.AF_INET)
of multiCodec("dns6"):
await self.resolveOneAddress(address, Domain.AF_INET6)
of multiCodec("dnsaddr"):
await self.resolveDnsAddr(address)
else:
assert false
@[address]
for ad in seq:
let
firstPart = address[0].valueOr:
raise maErr error
code = firstPart.protoCode().valueOr:
raise maErr error
ads =
case code
of multiCodec("dns"):
await self.resolveOneAddress(address)
of multiCodec("dns4"):
await self.resolveOneAddress(address, Domain.AF_INET)
of multiCodec("dns6"):
await self.resolveOneAddress(address, Domain.AF_INET6)
of multiCodec("dnsaddr"):
await self.resolveDnsAddr(address)
else:
raise maErr("Unsupported codec " & $code)
for ad in ads:
res.incl(ad)
return res.toSeq
res.toSeq

View File

@@ -63,6 +63,7 @@ type
KeyBook* {.public.} = ref object of PeerBook[PublicKey]
AgentBook* {.public.} = ref object of PeerBook[string]
LastSeenBook* {.public.} = ref object of PeerBook[Opt[MultiAddress]]
ProtoVersionBook* {.public.} = ref object of PeerBook[string]
SPRBook* {.public.} = ref object of PeerBook[Envelope]
@@ -145,10 +146,16 @@ proc del*(peerStore: PeerStore, peerId: PeerId) {.public.} =
for _, book in peerStore.books:
book.deletor(peerId)
proc updatePeerInfo*(peerStore: PeerStore, info: IdentifyInfo) =
if info.addrs.len > 0:
proc updatePeerInfo*(
peerStore: PeerStore,
info: IdentifyInfo,
observedAddr: Opt[MultiAddress] = Opt.none(MultiAddress),
) =
if len(info.addrs) > 0:
peerStore[AddressBook][info.peerId] = info.addrs
peerStore[LastSeenBook][info.peerId] = observedAddr
info.pubkey.withValue(pubkey):
peerStore[KeyBook][info.peerId] = pubkey
@@ -200,7 +207,7 @@ proc identify*(peerStore: PeerStore, muxer: Muxer) {.async.} =
knownAgent = shortAgent
muxer.connection.setShortAgent(knownAgent)
peerStore.updatePeerInfo(info)
peerStore.updatePeerInfo(info, stream.observedAddr)
finally:
await stream.closeWithEOF()

View File

@@ -16,8 +16,6 @@ export results, utility
{.push public.}
const MaxMessageSize = 1'u shl 22
type
ProtoFieldKind* = enum
## Protobuf's field types enum
@@ -39,7 +37,6 @@ type
buffer*: seq[byte]
offset*: int
length*: int
maxSize*: uint
ProtoHeader* = object
wire*: ProtoFieldKind
@@ -63,7 +60,6 @@ type
VarintDecode
MessageIncomplete
BufferOverflow
MessageTooBig
BadWireType
IncorrectBlob
RequiredFieldMissing
@@ -99,11 +95,14 @@ template getProtoHeader*(field: ProtoField): uint64 =
template toOpenArray*(pb: ProtoBuffer): untyped =
toOpenArray(pb.buffer, pb.offset, len(pb.buffer) - 1)
template lenu64*(x: untyped): untyped =
uint64(len(x))
template isEmpty*(pb: ProtoBuffer): bool =
len(pb.buffer) - pb.offset <= 0
template isEnough*(pb: ProtoBuffer, length: int): bool =
len(pb.buffer) - pb.offset - length >= 0
template isEnough*(pb: ProtoBuffer, length: uint64): bool =
pb.offset <= len(pb.buffer) and length <= uint64(len(pb.buffer) - pb.offset)
template getPtr*(pb: ProtoBuffer): pointer =
cast[pointer](unsafeAddr pb.buffer[pb.offset])
@@ -127,33 +126,25 @@ proc vsizeof*(field: ProtoField): int {.inline.} =
0
proc initProtoBuffer*(
data: seq[byte], offset = 0, options: set[ProtoFlags] = {}, maxSize = MaxMessageSize
data: seq[byte], offset = 0, options: set[ProtoFlags] = {}
): ProtoBuffer =
## Initialize ProtoBuffer with shallow copy of ``data``.
result.buffer = data
result.offset = offset
result.options = options
result.maxSize = maxSize
proc initProtoBuffer*(
data: openArray[byte],
offset = 0,
options: set[ProtoFlags] = {},
maxSize = MaxMessageSize,
data: openArray[byte], offset = 0, options: set[ProtoFlags] = {}
): ProtoBuffer =
## Initialize ProtoBuffer with copy of ``data``.
result.buffer = @data
result.offset = offset
result.options = options
result.maxSize = maxSize
proc initProtoBuffer*(
options: set[ProtoFlags] = {}, maxSize = MaxMessageSize
): ProtoBuffer =
proc initProtoBuffer*(options: set[ProtoFlags] = {}): ProtoBuffer =
## Initialize ProtoBuffer with new sequence of capacity ``cap``.
result.buffer = newSeq[byte]()
result.options = options
result.maxSize = maxSize
if WithVarintLength in options:
# Our buffer will start from position 10, so we can store length of buffer
# in [0, 9].
@@ -194,12 +185,12 @@ proc write*[T: ProtoScalar](pb: var ProtoBuffer, field: int, value: T) =
doAssert(vres.isOk())
pb.offset += length
elif T is float32:
doAssert(pb.isEnough(sizeof(T)))
doAssert(pb.isEnough(uint64(sizeof(T))))
let u32 = cast[uint32](value)
pb.buffer[pb.offset ..< pb.offset + sizeof(T)] = u32.toBytesLE()
pb.offset += sizeof(T)
elif T is float64:
doAssert(pb.isEnough(sizeof(T)))
doAssert(pb.isEnough(uint64(sizeof(T))))
let u64 = cast[uint64](value)
pb.buffer[pb.offset ..< pb.offset + sizeof(T)] = u64.toBytesLE()
pb.offset += sizeof(T)
@@ -242,12 +233,12 @@ proc writePacked*[T: ProtoScalar](
doAssert(vres.isOk())
pb.offset += length
elif T is float32:
doAssert(pb.isEnough(sizeof(T)))
doAssert(pb.isEnough(uint64(sizeof(T))))
let u32 = cast[uint32](item)
pb.buffer[pb.offset ..< pb.offset + sizeof(T)] = u32.toBytesLE()
pb.offset += sizeof(T)
elif T is float64:
doAssert(pb.isEnough(sizeof(T)))
doAssert(pb.isEnough(uint64(sizeof(T))))
let u64 = cast[uint64](item)
pb.buffer[pb.offset ..< pb.offset + sizeof(T)] = u64.toBytesLE()
pb.offset += sizeof(T)
@@ -268,7 +259,7 @@ proc write*[T: byte | char](pb: var ProtoBuffer, field: int, value: openArray[T]
doAssert(lres.isOk())
pb.offset += length
if len(value) > 0:
doAssert(pb.isEnough(len(value)))
doAssert(pb.isEnough(value.lenu64))
copyMem(addr pb.buffer[pb.offset], unsafeAddr value[0], len(value))
pb.offset += len(value)
@@ -327,13 +318,13 @@ proc skipValue(data: var ProtoBuffer, header: ProtoHeader): ProtoResult[void] =
else:
err(ProtoError.VarintDecode)
of ProtoFieldKind.Fixed32:
if data.isEnough(sizeof(uint32)):
if data.isEnough(uint64(sizeof(uint32))):
data.offset += sizeof(uint32)
ok()
else:
err(ProtoError.VarintDecode)
of ProtoFieldKind.Fixed64:
if data.isEnough(sizeof(uint64)):
if data.isEnough(uint64(sizeof(uint64))):
data.offset += sizeof(uint64)
ok()
else:
@@ -343,14 +334,11 @@ proc skipValue(data: var ProtoBuffer, header: ProtoHeader): ProtoResult[void] =
var bsize = 0'u64
if PB.getUVarint(data.toOpenArray(), length, bsize).isOk():
data.offset += length
if bsize <= uint64(data.maxSize):
if data.isEnough(int(bsize)):
data.offset += int(bsize)
ok()
else:
err(ProtoError.MessageIncomplete)
if data.isEnough(bsize):
data.offset += int(bsize)
ok()
else:
err(ProtoError.MessageTooBig)
err(ProtoError.MessageIncomplete)
else:
err(ProtoError.VarintDecode)
of ProtoFieldKind.StartGroup, ProtoFieldKind.EndGroup:
@@ -382,7 +370,7 @@ proc getValue[T: ProtoScalar](
err(ProtoError.VarintDecode)
elif T is float32:
doAssert(header.wire == ProtoFieldKind.Fixed32)
if data.isEnough(sizeof(float32)):
if data.isEnough(uint64(sizeof(float32))):
outval = cast[float32](fromBytesLE(uint32, data.toOpenArray()))
data.offset += sizeof(float32)
ok()
@@ -390,7 +378,7 @@ proc getValue[T: ProtoScalar](
err(ProtoError.MessageIncomplete)
elif T is float64:
doAssert(header.wire == ProtoFieldKind.Fixed64)
if data.isEnough(sizeof(float64)):
if data.isEnough(uint64(sizeof(float64))):
outval = cast[float64](fromBytesLE(uint64, data.toOpenArray()))
data.offset += sizeof(float64)
ok()
@@ -410,22 +398,19 @@ proc getValue[T: byte | char](
outLength = 0
if PB.getUVarint(data.toOpenArray(), length, bsize).isOk():
data.offset += length
if bsize <= uint64(data.maxSize):
if data.isEnough(int(bsize)):
outLength = int(bsize)
if len(outBytes) >= int(bsize):
if bsize > 0'u64:
copyMem(addr outBytes[0], addr data.buffer[data.offset], int(bsize))
data.offset += int(bsize)
ok()
else:
# Buffer overflow should not be critical failure
data.offset += int(bsize)
err(ProtoError.BufferOverflow)
if data.isEnough(bsize):
outLength = int(bsize)
if len(outBytes) >= int(bsize):
if bsize > 0'u64:
copyMem(addr outBytes[0], addr data.buffer[data.offset], int(bsize))
data.offset += int(bsize)
ok()
else:
err(ProtoError.MessageIncomplete)
# Buffer overflow should not be critical failure
data.offset += int(bsize)
err(ProtoError.BufferOverflow)
else:
err(ProtoError.MessageTooBig)
err(ProtoError.MessageIncomplete)
else:
err(ProtoError.VarintDecode)
@@ -439,17 +424,14 @@ proc getValue[T: seq[byte] | string](
if PB.getUVarint(data.toOpenArray(), length, bsize).isOk():
data.offset += length
if bsize <= uint64(data.maxSize):
if data.isEnough(int(bsize)):
outBytes.setLen(bsize)
if bsize > 0'u64:
copyMem(addr outBytes[0], addr data.buffer[data.offset], int(bsize))
data.offset += int(bsize)
ok()
else:
err(ProtoError.MessageIncomplete)
if data.isEnough(bsize):
outBytes.setLen(bsize)
if bsize > 0'u64:
copyMem(addr outBytes[0], addr data.buffer[data.offset], int(bsize))
data.offset += int(bsize)
ok()
else:
err(ProtoError.MessageTooBig)
err(ProtoError.MessageIncomplete)
else:
err(ProtoError.VarintDecode)

View File

@@ -12,6 +12,7 @@
import stew/[results, objects]
import chronos, chronicles
import ../../../multiaddress, ../../../peerid, ../../../errors
import ../../../protobuf/minprotobuf
logScope:
topics = "libp2p autonat"

View File

@@ -47,8 +47,7 @@ proc sendResponseError(
if text == "":
Opt.none(string)
else:
Opt.some(text)
,
Opt.some(text),
ma: Opt.none(MultiAddress),
).encode()
await conn.writeLp(pb.buffer)

View File

@@ -15,6 +15,7 @@ import chronos
import stew/objects
import ../../../multiaddress, ../../../errors, ../../../stream/connection
import ../../../protobuf/minprotobuf
export multiaddress

View File

@@ -12,6 +12,7 @@
import macros
import stew/[objects, results]
import ../../../peerinfo, ../../../signed_envelope
import ../../../protobuf/minprotobuf
# Circuit Relay V1 Message

View File

@@ -334,8 +334,7 @@ proc setup*(r: Relay, switch: Switch) =
r.switch = switch
r.switch.addPeerEventHandler(
proc(peerId: PeerId, event: PeerEvent) {.async.} =
r.rsvp.del(peerId)
,
r.rsvp.del(peerId),
Left,
)

View File

@@ -92,8 +92,7 @@ proc new*(
when maxIncomingStreams is int:
Opt.some(maxIncomingStreams)
else:
maxIncomingStreams
,
maxIncomingStreams,
)
proc new*[E](

View File

@@ -1,8 +1,8 @@
import ./pubsub/[pubsub, floodsub, gossipsub]
import ./pubsub/[pubsub, floodsub, gossipsub, errors]
## Home of PubSub & it's implementations:
## | **pubsub**: base interface for pubsub implementations
## | **floodsub**: simple flood-based publishing
## | **gossipsub**: more sophisticated gossip based publishing
export pubsub, floodsub, gossipsub
export pubsub, floodsub, gossipsub, errors

View File

@@ -1,9 +1,15 @@
# this module will be further extended in PR
# https://github.com/status-im/nim-libp2p/pull/107/
import ../../utility
import ../../utility, ../../errors
type ValidationResult* {.pure, public.} = enum
Accept
Reject
Ignore
type PublishOutcome* {.pure, public.} = enum
NoTopicSpecified
DuplicateMessage
NoPeersToPublish
CannotGenerateMessageId

View File

@@ -24,6 +24,7 @@ import
../../peerinfo,
../../utility
export pubsub
## Simple flood-based publishing.
logScope:
@@ -101,10 +102,12 @@ method unsubscribePeer*(f: FloodSub, peer: PeerId) =
procCall PubSub(f).unsubscribePeer(peer)
method rpcHandler*(f: FloodSub, peer: PubSubPeer, data: seq[byte]) {.async.} =
method rpcHandler*(
f: FloodSub, peer: PubSubPeer, data: seq[byte]
) {.async: (raises: [CancelledError, PeerMessageDecodeError, PeerRateLimitError]).} =
var rpcMsg = decodeRpcMsg(data).valueOr:
debug "failed to decode msg from peer", peer, err = error
raise newException(CatchableError, "Peer msg couldn't be decoded")
raise newException(PeerMessageDecodeError, "Peer msg couldn't be decoded")
trace "decoded msg from peer", peer, payload = rpcMsg.shortLog
# trigger hooks
@@ -186,45 +189,38 @@ method init*(f: FloodSub) =
# This is top-level procedure which will work as separate task, so it
# do not need to propagate CancelledError.
trace "Unexpected cancellation in floodsub handler", conn
except CatchableError as exc:
trace "FloodSub handler leaks an error", description = exc.msg, conn
f.handler = handler
f.codec = FloodSubCodec
method publish*(f: FloodSub, topic: string, data: seq[byte]): Future[int] {.async.} =
method doPublish*(
f: FloodSub, topic: string, data: seq[byte]
): Future[PublishResult] {.async: (raises: []).} =
# base returns always 0
discard await procCall PubSub(f).publish(topic, data)
discard await procCall PubSub(f).doPublish(topic, data)
trace "Publishing message on topic", data = data.shortLog, topic
if topic.len <= 0: # data could be 0/empty
debug "Empty topic, skipping publish", topic
return 0
return err(NoTopicSpecified)
let peers = f.floodsub.getOrDefault(topic)
if peers.len == 0:
debug "No peers for topic, skipping publish", topic
return 0
return err(NoPeersToPublish)
let
msg =
if f.anonymize:
Message.init(none(PeerInfo), data, topic, none(uint64), false)
else:
inc f.msgSeqno
Message.init(some(f.peerInfo), data, topic, some(f.msgSeqno), f.sign)
msgId = f.msgIdProvider(msg).valueOr:
trace "Error generating message id, skipping publish", error = error
return 0
let (msg, msgId) = f.createMessage(topic, data).valueOr:
trace "Error creating message, skipping publish", error = error
return err(CannotGenerateMessageId)
trace "Created new message", payload = shortLog(msg), peers = peers.len, topic, msgId
if f.addSeen(f.salt(msgId)):
# custom msgid providers might cause this
trace "Dropping already-seen message", msgId, topic
return 0
return err(DuplicateMessage)
# Try to send to all peers that are known to be interested
f.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)
@@ -234,7 +230,7 @@ method publish*(f: FloodSub, topic: string, data: seq[byte]): Future[int] {.asyn
trace "Published message to peers", msgId, topic
return peers.len
return ok(peers.len)
method initPubSub*(f: FloodSub) {.raises: [InitializationError].} =
procCall PubSub(f).initPubSub()

View File

@@ -15,7 +15,6 @@ import std/[sets, sequtils]
import chronos, chronicles, metrics
import chronos/ratelimit
import
./pubsub,
./floodsub,
./pubsubpeer,
./peertable,
@@ -34,7 +33,7 @@ export results
import ./gossipsub/[types, scoring, behavior], ../../utils/heartbeat
export types, scoring, behavior, pubsub
export types, scoring, behavior, floodsub
logScope:
topics = "libp2p gossipsub"
@@ -102,6 +101,7 @@ proc init*(
overheadRateLimit = Opt.none(tuple[bytes: int, interval: Duration]),
disconnectPeerAboveRateLimit = false,
maxNumElementsInNonPriorityQueue = DefaultMaxNumElementsInNonPriorityQueue,
sendIDontWantOnPublish = false,
): GossipSubParams =
GossipSubParams(
explicit: true,
@@ -139,6 +139,7 @@ proc init*(
overheadRateLimit: overheadRateLimit,
disconnectPeerAboveRateLimit: disconnectPeerAboveRateLimit,
maxNumElementsInNonPriorityQueue: maxNumElementsInNonPriorityQueue,
sendIDontWantOnPublish: sendIDontWantOnPublish,
)
proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
@@ -219,8 +220,6 @@ method init*(g: GossipSub) =
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in gossipsub handler", conn
except CatchableError as exc:
trace "GossipSub handler leaks an error", description = exc.msg, conn
g.handler = handler
g.codecs &= GossipSubCodec_12
@@ -383,9 +382,44 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
trace "sending iwant reply messages", peer
g.send(peer, RPCMsg(messages: messages), isHighPriority = false)
proc sendIDontWant(
g: GossipSub,
msg: Message,
msgId: MessageId,
peersToSendIDontWant: HashSet[PubSubPeer],
) =
# If the message is "large enough", let the mesh know that we do not want
# any more copies of it, regardless if it is valid or not.
#
# In the case that it is not valid, this leads to some redundancy
# (since the other peer should not send us an invalid message regardless),
# but the expectation is that this is rare (due to such peers getting
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).
# IDONTWANT is only supported by >= GossipSubCodec_12
let peers = peersToSendIDontWant.filterIt(
it.codec != GossipSubCodec_10 and it.codec != GossipSubCodec_11
)
if peers.len > 0:
g.broadcast(
peers,
RPCMsg(
control: some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)
const iDontWantMessageSizeThreshold* = 512
proc isLargeMessage(msg: Message, msgId: MessageId): bool =
msg.data.len > max(iDontWantMessageSizeThreshold, msgId.len * 10)
proc validateAndRelay(
g: GossipSub, msg: Message, msgId: MessageId, saltedId: SaltedId, peer: PubSubPeer
) {.async.} =
) {.async: (raises: []).} =
try:
template topic(): string =
msg.topic
@@ -399,29 +433,10 @@ proc validateAndRelay(
toSendPeers.incl(peers[])
toSendPeers.excl(peer)
if msg.data.len > max(512, msgId.len * 10):
# If the message is "large enough", let the mesh know that we do not want
# any more copies of it, regardless if it is valid or not.
#
# In the case that it is not valid, this leads to some redundancy
# (since the other peer should not send us an invalid message regardless),
# but the expectation is that this is rare (due to such peers getting
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).
if isLargeMessage(msg, msgId):
var peersToSendIDontWant = HashSet[PubSubPeer]()
addToSendPeers(peersToSendIDontWant)
peersToSendIDontWant.exclIfIt(
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
)
g.broadcast(
peersToSendIDontWant,
RPCMsg(
control:
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)
g.sendIDontWant(msg, msgId, peersToSendIDontWant)
let validation = await g.validate(msg)
@@ -490,7 +505,9 @@ proc validateAndRelay(
)
await handleData(g, topic, msg.data)
except CatchableError as exc:
except CancelledError as exc:
info "validateAndRelay failed", description = exc.msg
except PeerRateLimitError as exc:
info "validateAndRelay failed", description = exc.msg
proc dataAndTopicsIdSize(msgs: seq[Message]): int =
@@ -511,7 +528,9 @@ proc messageOverhead(g: GossipSub, msg: RPCMsg, msgSize: int): int =
msgSize - payloadSize - controlSize
proc rateLimit*(g: GossipSub, peer: PubSubPeer, overhead: int) {.async.} =
proc rateLimit*(
g: GossipSub, peer: PubSubPeer, overhead: int
) {.async: (raises: [PeerRateLimitError]).} =
peer.overheadRateLimitOpt.withValue(overheadRateLimit):
if not overheadRateLimit.tryConsume(overhead):
libp2p_gossipsub_peers_rate_limit_hits.inc(labelValues = [peer.getAgent()])
@@ -524,7 +543,9 @@ proc rateLimit*(g: GossipSub, peer: PubSubPeer, overhead: int) {.async.} =
PeerRateLimitError, "Peer disconnected because it's above rate limit."
)
method rpcHandler*(g: GossipSub, peer: PubSubPeer, data: seq[byte]) {.async.} =
method rpcHandler*(
g: GossipSub, peer: PubSubPeer, data: seq[byte]
) {.async: (raises: [CancelledError, PeerMessageDecodeError, PeerRateLimitError]).} =
let msgSize = data.len
var rpcMsg = decodeRpcMsg(data).valueOr:
debug "failed to decode msg from peer", peer, err = error
@@ -534,7 +555,7 @@ method rpcHandler*(g: GossipSub, peer: PubSubPeer, data: seq[byte]) {.async.} =
# TODO evaluate behaviour penalty values
peer.behaviourPenalty += 0.1
raise newException(CatchableError, "Peer msg couldn't be decoded")
raise newException(PeerMessageDecodeError, "Peer msg couldn't be decoded")
when defined(libp2p_expensive_metrics):
for m in rpcMsg.messages:
@@ -682,19 +703,9 @@ method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
# Send unsubscribe (in reverse order to sub/graft)
procCall PubSub(g).onTopicSubscription(topic, subscribed)
method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.async.} =
logScope:
topic
if topic.len <= 0: # data could be 0/empty
debug "Empty topic, skipping publish"
return 0
# base returns always 0
discard await procCall PubSub(g).publish(topic, data)
trace "Publishing message on topic", data = data.shortLog
proc collectPeersForPublish(
g: GossipSub, topic: string, payloadLen: int
): Result[HashSet[PubSubPeer], void] {.raises: [].} =
var peers: HashSet[PubSubPeer]
# add always direct peers
@@ -713,7 +724,7 @@ method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.asy
let
bandwidth = (g.parameters.bandwidthEstimatebps) div 8 div 1000
# Divisions are to convert it to Bytes per ms TODO replace with bandwidth estimate
msToTransmit = max(data.len div bandwidth, 1)
msToTransmit = max(payloadLen div bandwidth, 1)
max(
g.parameters.heartbeatInterval.milliseconds div msToTransmit,
g.parameters.dLow,
@@ -748,25 +759,37 @@ method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.asy
g.lastFanoutPubSub[topic] = Moment.fromNow(g.parameters.fanoutTTL)
if peers.len == 0:
let topicPeers = g.gossipsub.getOrDefault(topic).toSeq()
debug "No peers for topic, skipping publish",
peersOnTopic = topicPeers.len,
connectedPeers = topicPeers.filterIt(it.connected).len,
topic
libp2p_gossipsub_failed_publish.inc()
return 0
return err()
let
msg =
if g.anonymize:
Message.init(none(PeerInfo), data, topic, none(uint64), false)
else:
inc g.msgSeqno
Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign)
msgId = g.msgIdProvider(msg).valueOr:
trace "Error generating message id, skipping publish", error = error
libp2p_gossipsub_failed_publish.inc()
return 0
return ok(peers)
method doPublish*(
g: GossipSub, topic: string, data: seq[byte]
): Future[PublishResult] {.async: (raises: []).} =
logScope:
topic
if topic.len <= 0: # data could be 0/empty
debug "Empty topic, skipping publish"
return err(NoTopicSpecified)
# base returns always 0
discard await procCall PubSub(g).doPublish(topic, data)
trace "Publishing message on topic", data = data.shortLog
var peers = g.collectPeersForPublish(topic, data.len).valueOr:
let
topicPeers = g.gossipsub.getOrDefault(topic)
connectedPeersLen = topicPeers.filterIt(it.connected).len()
debug "No peers for topic, skipping publish",
peersOnTopic = topicPeers.len, connectedPeers = connectedPeersLen, topic
libp2p_gossipsub_failed_publish.inc()
return err(NoPeersToPublish)
let (msg, msgId) = g.createMessage(topic, data).valueOr:
error "Error creating message, skipping publish", error = error
return err(CannotGenerateMessageId)
logScope:
msgId = shortLog(msgId)
@@ -778,10 +801,13 @@ method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.asy
# this might happen when not using sequence id:s and / or with a custom
# message id provider
trace "Dropping already-seen message"
return 0
return err(DuplicateMessage)
g.mcache.put(msgId, msg)
if g.parameters.sendIDontWantOnPublish and isLargeMessage(msg, msgId):
g.sendIDontWant(msg, msgId, peers)
g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)
if g.knownTopics.contains(topic):
@@ -790,9 +816,11 @@ method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.asy
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"])
trace "Published message to peers", peers = peers.len
return peers.len
return ok(peers.len())
proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} =
proc maintainDirectPeer(
g: GossipSub, id: PeerId, addrs: seq[MultiAddress]
) {.async: (raises: [CancelledError]).} =
if id notin g.peers:
trace "Attempting to dial a direct peer", peer = id
if g.switch.isConnected(id):
@@ -808,11 +836,13 @@ proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.as
except CatchableError as exc:
debug "Direct peer error dialing", description = exc.msg
proc addDirectPeer*(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} =
proc addDirectPeer*(
g: GossipSub, id: PeerId, addrs: seq[MultiAddress]
) {.async: (raises: [CancelledError]).} =
g.parameters.directPeers[id] = addrs
await g.maintainDirectPeer(id, addrs)
proc maintainDirectPeers(g: GossipSub) {.async.} =
proc maintainDirectPeers(g: GossipSub) {.async: (raises: [CancelledError]).} =
heartbeat "GossipSub DirectPeers", 1.minutes:
for id, addrs in g.parameters.directPeers:
await g.addDirectPeer(id, addrs)
@@ -846,9 +876,9 @@ method stop*(g: GossipSub): Future[void] {.async: (raises: [], raw: true).} =
return fut
# stop heartbeat interval
g.directPeersLoop.cancel()
g.scoringHeartbeatFut.cancel()
g.heartbeatFut.cancel()
g.directPeersLoop.cancelSoon()
g.scoringHeartbeatFut.cancelSoon()
g.heartbeatFut.cancelSoon()
g.heartbeatFut = nil
fut

View File

@@ -126,8 +126,7 @@ proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] =
if x.peerId in sprBook:
sprBook[x.peerId].encode().get(default(seq[byte]))
else:
default(seq[byte])
,
default(seq[byte]),
)
proc handleGraft*(
@@ -770,7 +769,7 @@ proc onHeartbeat(g: GossipSub) =
g.mcache.shift() # shift the cache
proc heartbeat*(g: GossipSub) {.async.} =
proc heartbeat*(g: GossipSub) {.async: (raises: [CancelledError]).} =
heartbeat "GossipSub", g.parameters.heartbeatInterval:
trace "running heartbeat", instance = cast[int](g)
g.onHeartbeat()

View File

@@ -131,7 +131,7 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 =
else:
0.0
proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} =
proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async: (raises: []).} =
try:
await g.switch.disconnect(peer.peerId)
except CatchableError as exc: # Never cancelled
@@ -313,12 +313,14 @@ proc updateScores*(g: GossipSub) = # avoid async
trace "updated scores", peers = g.peers.len
proc scoringHeartbeat*(g: GossipSub) {.async.} =
proc scoringHeartbeat*(g: GossipSub) {.async: (raises: [CancelledError]).} =
heartbeat "Gossipsub scoring", g.parameters.decayInterval:
trace "running scoring heartbeat", instance = cast[int](g)
g.updateScores()
proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) {.async.} =
proc punishInvalidMessage*(
g: GossipSub, peer: PubSubPeer, msg: Message
) {.async: (raises: [PeerRateLimitError]).} =
let uselessAppBytesNum = msg.data.len
peer.overheadRateLimitOpt.withValue(overheadRateLimit):
if not overheadRateLimit.tryConsume(uselessAppBytesNum):

View File

@@ -154,6 +154,9 @@ type
# Max number of elements allowed in the non-priority queue. When this limit has been reached, the peer will be disconnected.
maxNumElementsInNonPriorityQueue*: int
# Broadcast an IDONTWANT message automatically when the message exceeds the IDONTWANT message size threshold
sendIDontWantOnPublish*: bool
BackoffTable* = Table[string, Table[PeerId, Moment]]
ValidationSeenTable* = Table[SaltedId, HashSet[PubSubPeer]]

View File

@@ -125,6 +125,8 @@ declarePublicCounter(
type
InitializationError* = object of LPError
PeerMessageDecodeError* = object of CatchableError
TopicHandler* {.public.} =
proc(topic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].}
@@ -143,6 +145,8 @@ type
## we have to store it, which may be an attack vector.
## This callback can be used to reject topic we're not interested in
PublishResult* {.public.} = Result[int, PublishOutcome]
PubSub* {.public.} = ref object of LPProtocol
switch*: Switch # the switch used to dial/connect to peers
peerInfo*: PeerInfo # this peer's info
@@ -257,7 +261,7 @@ proc broadcast*(
trace "broadcasting messages to peers", peers = sendPeers.len, payload = shortLog(msg)
if anyIt(sendPeers, it.hasObservers):
if anyIt(sendPeers, it.hasBeforeSendObservers):
for peer in sendPeers:
p.send(peer, msg, isHighPriority)
else:
@@ -327,7 +331,9 @@ proc updateMetrics*(p: PubSub, rpcMsg: RPCMsg) =
method rpcHandler*(
p: PubSub, peer: PubSubPeer, data: seq[byte]
): Future[void] {.base, async.} =
): Future[void] {.
base, async: (raises: [CancelledError, PeerMessageDecodeError, PeerRateLimitError])
.} =
## Handler that must be overridden by concrete implementation
raiseAssert "Unimplemented"
@@ -355,8 +361,11 @@ method getOrCreatePeer*(
peer[].codec = protoNegotiated
return peer[]
proc getConn(): Future[Connection] {.async.} =
return await p.switch.dial(peerId, protosToDial)
proc getConn(): Future[Connection] {.async: (raises: [GetConnDialError]).} =
try:
return await p.switch.dial(peerId, protosToDial)
except CatchableError as e:
raise (ref GetConnDialError)(parent: e)
proc onEvent(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} =
p.onPubSubPeerEvent(peer, event)
@@ -367,7 +376,7 @@ method getOrCreatePeer*(
debug "created new pubsub peer", peerId
p.peers[peerId] = pubSubPeer
pubSubPeer.observers = p.observers
pubSubPeer.setObservers(p.observers)
onNewPeer(p, pubSubPeer)
@@ -376,7 +385,9 @@ method getOrCreatePeer*(
return pubSubPeer
proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] =
proc handleData*(
p: PubSub, topic: string, data: seq[byte]
): Future[void] {.async: (raises: [], raw: true).} =
# Start work on all data handlers without copying data into closure like
# happens on {.async.} transformation
p.topics.withValue(topic, handlers):
@@ -389,7 +400,7 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] =
futs.add(fut)
if futs.len() > 0:
proc waiter(): Future[void] {.async.} =
proc waiter(): Future[void] {.async: (raises: []).} =
# slow path - we have to wait for the handlers to complete
try:
futs = await allFinished(futs)
@@ -397,12 +408,12 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] =
# propagate cancellation
for fut in futs:
if not (fut.finished):
fut.cancel()
fut.cancelSoon()
# check for errors in futures
for fut in futs:
if fut.failed:
let err = fut.readError()
let err = fut.error()
warn "Error in topic handler", description = err.msg
return waiter()
@@ -412,7 +423,9 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] =
res.complete()
return res
method handleConn*(p: PubSub, conn: Connection, proto: string) {.base, async.} =
method handleConn*(
p: PubSub, conn: Connection, proto: string
) {.base, async: (raises: [CancelledError]).} =
## handle incoming connections
##
## this proc will:
@@ -424,7 +437,9 @@ method handleConn*(p: PubSub, conn: Connection, proto: string) {.base, async.} =
## that we're interested in
##
proc handler(peer: PubSubPeer, data: seq[byte]): Future[void] =
proc handler(
peer: PubSubPeer, data: seq[byte]
): Future[void] {.async: (raises: []).} =
# call pubsub rpc handler
p.rpcHandler(peer, data)
@@ -436,7 +451,7 @@ method handleConn*(p: PubSub, conn: Connection, proto: string) {.base, async.} =
trace "pubsub peer handler ended", conn
except CancelledError as exc:
raise exc
except CatchableError as exc:
except PeerMessageDecodeError as exc:
trace "exception ocurred in pubsub handle", description = exc.msg, conn
finally:
await conn.closeWithEOF()
@@ -540,9 +555,27 @@ proc subscribe*(p: PubSub, topic: string, handler: TopicHandler) {.public.} =
p.updateTopicMetrics(topic)
method publish*(
method createMessage*(
p: PubSub, topic: string, data: seq[byte]
): Future[int] {.base, async, public.} =
): Result[(Message, MessageId), string] {.base, gcsafe, raises: [].} =
let
msg =
if p.anonymize:
Message.init(none(PeerInfo), data, topic, none(uint64), false)
else:
inc p.msgSeqno
Message.init(some(p.peerInfo), data, topic, some(p.msgSeqno), p.sign)
msgId = p.msgIdProvider(msg).valueOr:
return err("Failed to generate message id")
return ok((msg, msgId))
# `Non-public` overridable interface for publshing messages.
# Although it must be exported, this not intended to call from outside by class users,
# but call `publish`.
method doPublish*(
p: PubSub, topic: string, data: seq[byte]
): Future[PublishResult] {.base, async: (raises: []).} =
## publish to a ``topic``
##
## The return value is the number of neighbours that we attempted to send the
@@ -552,7 +585,13 @@ method publish*(
if p.triggerSelf:
await handleData(p, topic, data)
return 0
return ok(0)
proc publish*(
p: PubSub, topic: string, data: seq[byte]
): Future[int] {.async: (raises: []), public.} =
return (await p.doPublish(topic, data)).valueOr:
return 0
method initPubSub*(p: PubSub) {.base, raises: [InitializationError].} =
## perform pubsub initialization
@@ -581,7 +620,7 @@ method removeValidator*(
method validate*(
p: PubSub, message: Message
): Future[ValidationResult] {.async, base.} =
): Future[ValidationResult] {.async: (raises: [CancelledError]), base.} =
var pending: seq[Future[ValidationResult]]
trace "about to validate message"
let topic = message.topic
@@ -589,22 +628,27 @@ method validate*(
topic = topic, registered = toSeq(p.validators.keys)
if topic in p.validators:
trace "running validators for topic", topic = topic
for validator in p.validators[topic]:
pending.add(validator(topic, message))
result = ValidationResult.Accept
p.validators.withValue(topic, validators):
for validator in validators[]:
pending.add(validator(topic, message))
var valResult = ValidationResult.Accept
let futs = await allFinished(pending)
for fut in futs:
if fut.failed:
result = ValidationResult.Reject
valResult = ValidationResult.Reject
break
let res = fut.read()
if res != ValidationResult.Accept:
result = res
if res == ValidationResult.Reject:
break
try:
let res = fut.read()
if res != ValidationResult.Accept:
valResult = res
if res == ValidationResult.Reject:
break
except CatchableError as e:
trace "validator for message could not be executed, ignoring",
topic = topic, err = e.msg
valResult = ValidationResult.Ignore
case result
case valResult
of ValidationResult.Accept:
libp2p_pubsub_validation_success.inc()
of ValidationResult.Reject:
@@ -612,6 +656,8 @@ method validate*(
of ValidationResult.Ignore:
libp2p_pubsub_validation_ignore.inc()
valResult
proc init*[PubParams: object | bool](
P: typedesc[PubSub],
switch: Switch,

View File

@@ -64,9 +64,12 @@ const DefaultMaxNumElementsInNonPriorityQueue* = 1024
type
PeerRateLimitError* = object of CatchableError
GetConnDialError* = object of CatchableError
PubSubObserver* = ref object
onRecv*: proc(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].}
onSend*: proc(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].}
onAfterSent*: proc(peer: PubSubPeer, msgs: RPCMsg) {.gcsafe, raises: [].}
onBeforeSend*: proc(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].}
onValidated*:
proc(peer: PubSubPeer, msg: Message, msgId: MessageId) {.gcsafe, raises: [].}
@@ -79,7 +82,7 @@ type
PubSubPeerEvent* = object
kind*: PubSubPeerEventKind
GetConn* = proc(): Future[Connection] {.gcsafe, raises: [].}
GetConn* = proc(): Future[Connection] {.gcsafe, async: (raises: [GetConnDialError]).}
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].}
# have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}
@@ -101,7 +104,7 @@ type
address*: Option[MultiAddress]
peerId*: PeerId
handler*: RPCHandler
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
observers: ref seq[PubSubObserver] # ref as in smart_ptr
score*: float64
sentIHaves*: Deque[HashSet[MessageId]]
@@ -121,8 +124,9 @@ type
# The max number of elements allowed in the non-priority queue.
disconnected: bool
RPCHandler* =
proc(peer: PubSubPeer, data: seq[byte]): Future[void] {.gcsafe, raises: [].}
RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void] {.
gcsafe, async: (raises: [])
.}
when defined(libp2p_agents_metrics):
func shortAgent*(p: PubSubPeer): string =
@@ -157,9 +161,6 @@ chronicles.formatIt(PubSubPeer):
proc connected*(p: PubSubPeer): bool =
not p.sendConn.isNil and not (p.sendConn.closed or p.sendConn.atEof)
proc hasObservers*(p: PubSubPeer): bool =
p.observers != nil and anyIt(p.observers[], it != nil)
func outbound*(p: PubSubPeer): bool =
# gossipsub 1.1 spec requires us to know if the transport is outgoing
# in order to give priotity to connections we make
@@ -167,30 +168,44 @@ func outbound*(p: PubSubPeer): bool =
# This behaviour is presrcibed to counter sybil attacks and ensures that a coordinated inbound attack can never fully take over the mesh
if not p.sendConn.isNil and p.sendConn.transportDir == Direction.Out: true else: false
proc setObservers*(p: PubSubPeer, observers: ref seq[PubSubObserver]) =
p.observers = observers
proc hasObservers*(p: PubSubPeer): bool =
p.observers != nil and anyIt(p.observers[], it != nil)
proc hasBeforeSendObservers*(p: PubSubPeer): bool =
p.observers != nil and anyIt(p.observers[], it != nil and it.onBeforeSend != nil)
proc recvObservers*(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks
if not (isNil(p.observers)) and p.observers[].len > 0:
if not (isNil(p.observers)):
for obs in p.observers[]:
if not (isNil(obs)): # TODO: should never be nil, but...
if not (isNil(obs.onRecv)):
obs.onRecv(p, msg)
if not (isNil(obs)) and not (isNil(obs.onRecv)):
obs.onRecv(p, msg)
proc validatedObservers*(p: PubSubPeer, msg: Message, msgId: MessageId) =
# trigger hooks
if not (isNil(p.observers)) and p.observers[].len > 0:
if not (isNil(p.observers)):
for obs in p.observers[]:
if not (isNil(obs.onValidated)):
if not (isNil(obs)) and not (isNil(obs.onValidated)):
obs.onValidated(p, msg, msgId)
proc sendObservers(p: PubSubPeer, msg: var RPCMsg) =
proc beforeSendObservers(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks
if not (isNil(p.observers)) and p.observers[].len > 0:
if not (isNil(p.observers)):
for obs in p.observers[]:
if not (isNil(obs)): # TODO: should never be nil, but...
if not (isNil(obs.onSend)):
obs.onSend(p, msg)
if not (isNil(obs)) and not (isNil(obs.onBeforeSend)):
obs.onBeforeSend(p, msg)
proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
proc afterSentObservers(p: PubSubPeer, msg: RPCMsg) =
# trigger hooks
if not (isNil(p.observers)):
for obs in p.observers[]:
if not (isNil(obs)) and not (isNil(obs.onAfterSent)):
obs.onAfterSent(p, msg)
proc handle*(p: PubSubPeer, conn: Connection) {.async: (raises: []).} =
debug "starting pubsub read loop", conn, peer = p, closed = conn.closed
try:
try:
@@ -206,7 +221,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
except PeerRateLimitError as exc:
debug "Peer rate limit exceeded, exiting read while",
conn, peer = p, description = exc.msg
except CatchableError as exc:
except LPStreamError as exc:
debug "Exception occurred in PubSubPeer.handle",
conn, peer = p, closed = conn.closed, description = exc.msg
finally:
@@ -215,13 +230,12 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
# This is top-level procedure which will work as separate task, so it
# do not need to propagate CancelledError.
trace "Unexpected cancellation in PubSubPeer.handle"
except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle",
conn, peer = p, closed = conn.closed, description = exc.msg
finally:
debug "exiting pubsub read loop", conn, peer = p, closed = conn.closed
proc closeSendConn(p: PubSubPeer, event: PubSubPeerEventKind) {.async.} =
proc closeSendConn(
p: PubSubPeer, event: PubSubPeerEventKind
) {.async: (raises: [CancelledError]).} =
if p.sendConn != nil:
trace "Removing send connection", p, conn = p.sendConn
await p.sendConn.close()
@@ -235,17 +249,20 @@ proc closeSendConn(p: PubSubPeer, event: PubSubPeerEventKind) {.async.} =
p.onEvent(p, PubSubPeerEvent(kind: event))
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "Errors during diconnection events", description = exc.msg
# don't cleanup p.address else we leak some gossip stat table
proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
proc connectOnce(
p: PubSubPeer
): Future[void] {.async: (raises: [CancelledError, GetConnDialError, LPError]).} =
try:
if p.connectedFut.finished:
p.connectedFut = newFuture[void]()
let newConn = await p.getConn().wait(5.seconds)
if newConn.isNil:
raise (ref LPError)(msg: "Cannot establish send connection")
let newConn =
try:
await p.getConn().wait(5.seconds)
except AsyncTimeoutError as error:
trace "getConn timed out", description = error.msg
raise (ref LPError)(msg: "Cannot establish send connection")
# When the send channel goes up, subscriptions need to be sent to the
# remote peer - if we had multiple channels up and one goes down, all
@@ -271,7 +288,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
finally:
await p.closeSendConn(PubSubPeerEventKind.StreamClosed)
proc connectImpl(p: PubSubPeer) {.async.} =
proc connectImpl(p: PubSubPeer) {.async: (raises: []).} =
try:
# Keep trying to establish a connection while it's possible to do so - the
# send connection might get disconnected due to a timeout or an unrelated
@@ -282,7 +299,11 @@ proc connectImpl(p: PubSubPeer) {.async.} =
p.connectedFut.complete()
return
await connectOnce(p)
except CatchableError as exc: # never cancelled
except CancelledError as exc:
debug "Could not establish send connection", description = exc.msg
except LPError as exc:
debug "Could not establish send connection", description = exc.msg
except GetConnDialError as exc:
debug "Could not establish send connection", description = exc.msg
proc connect*(p: PubSubPeer) =
@@ -317,7 +338,7 @@ proc clearSendPriorityQueue(p: PubSubPeer) =
value = p.rpcmessagequeue.sendPriorityQueue.len.int64, labelValues = [$p.peerId]
)
proc sendMsgContinue(conn: Connection, msgFut: Future[void]) {.async.} =
proc sendMsgContinue(conn: Connection, msgFut: Future[void]) {.async: (raises: []).} =
# Continuation for a pending `sendMsg` future from below
try:
await msgFut
@@ -331,7 +352,7 @@ proc sendMsgContinue(conn: Connection, msgFut: Future[void]) {.async.} =
await conn.close() # This will clean up the send connection
proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} =
proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async: (raises: [CancelledError]).} =
# Slow path of `sendMsg` where msg is held in memory while send connection is
# being set up
if p.sendConn == nil:
@@ -347,7 +368,7 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} =
trace "sending encoded msg to peer", conn, encoded = shortLog(msg)
await sendMsgContinue(conn, conn.writeLp(msg))
proc sendMsg(p: PubSubPeer, msg: seq[byte]): Future[void] =
proc sendMsg(p: PubSubPeer, msg: seq[byte]): Future[void] {.async: (raises: []).} =
if p.sendConn != nil and not p.sendConn.closed():
# Fast path that avoids copying msg (which happens for {.async.})
let conn = p.sendConn
@@ -466,10 +487,10 @@ proc send*(
# some forms of valid but redundantly encoded protobufs with unknown or
# duplicated fields
let encoded =
if p.hasObservers():
if p.hasBeforeSendObservers():
var mm = msg
# trigger send hooks
p.sendObservers(mm)
p.beforeSendObservers(mm)
sendMetrics(mm)
encodeRpcMsg(mm, anonymize)
else:
@@ -486,6 +507,8 @@ proc send*(
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
asyncSpawn p.sendEncoded(encoded, isHighPriority)
p.afterSentObservers(msg)
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
for sentIHave in p.sentIHaves.mitems():
if msgId in sentIHave:
@@ -493,7 +516,7 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
return true
return false
proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
proc sendNonPriorityTask(p: PubSubPeer) {.async: (raises: [CancelledError]).} =
while true:
# we send non-priority messages only if there are no pending priority messages
let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()

View File

@@ -65,7 +65,10 @@ proc init*(
topic: string,
seqno: Option[uint64],
sign: bool = true,
): Message {.gcsafe, raises: [LPError].} =
): Message {.gcsafe, raises: [].} =
if sign and peer.isNone():
doAssert(false, "Cannot sign message without peer info")
var msg = Message(data: data, topic: topic)
# order matters, we want to include seqno in the signature
@@ -81,9 +84,6 @@ proc init*(
.expect("Invalid private key!")
.getBytes()
.expect("Couldn't get public key bytes!")
else:
if sign:
raise (ref LPError)(msg: "Cannot sign message without peer info")
msg

View File

@@ -303,15 +303,14 @@ proc decodeMessages*(pb: ProtoBuffer): ProtoResult[seq[Message]] {.inline.} =
if ?pb.getRepeatedField(2, msgpbs):
trace "decodeMessages: read messages", count = len(msgpbs)
for item in msgpbs:
# size is constrained at the network level
msgs.add(?decodeMessage(initProtoBuffer(item, maxSize = uint.high)))
msgs.add(?decodeMessage(initProtoBuffer(item)))
else:
trace "decodeMessages: no messages found"
ok(msgs)
proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] =
trace "encodeRpcMsg: encoding message", payload = msg.shortLog()
var pb = initProtoBuffer(maxSize = uint.high)
var pb = initProtoBuffer()
for item in msg.subscriptions:
pb.write(1, item)
for item in msg.messages:
@@ -330,7 +329,7 @@ proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] =
proc decodeRpcMsg*(msg: seq[byte]): ProtoResult[RPCMsg] {.inline.} =
trace "decodeRpcMsg: decoding message", payload = msg.shortLog()
var pb = initProtoBuffer(msg, maxSize = uint.high)
var pb = initProtoBuffer(msg)
var rpcMsg = RPCMsg()
assign(rpcMsg.messages, ?pb.decodeMessages())
assign(rpcMsg.subscriptions, ?pb.decodeSubscriptions())

View File

@@ -14,6 +14,7 @@ import metrics except collect
import chronos, chronicles, bearssl/rand, stew/[byteutils, objects, results]
import
./protocol,
../protobuf/minprotobuf,
../switch,
../routing_record,
../utils/heartbeat,
@@ -750,7 +751,7 @@ proc new*(
rng: ref HmacDrbgContext = newRng(),
minDuration = MinimumDuration,
maxDuration = MaximumDuration,
): T =
): T {.raises: [RendezVousError].} =
let rdv = T.new(rng, minDuration, maxDuration)
rdv.setup(switch)
return rdv

View File

@@ -44,8 +44,7 @@ chronicles.formatIt(BufferStream):
proc len*(s: BufferStream): int =
s.readBuf.len + (if s.readQueue.len > 0: s.readQueue[0].len()
else: 0
)
else: 0)
method initStream*(s: BufferStream) =
if s.objName.len == 0:

View File

@@ -43,6 +43,7 @@ type
oid*: Oid
dir*: Direction
closedWithEOF: bool # prevent concurrent calls
isClosedRemotely*: bool
LPStreamError* = object of LPError
LPStreamIncompleteError* = object of LPStreamError

View File

@@ -99,8 +99,7 @@ proc new*(
if ServerFlags.TcpNoDelay in flags:
{SocketFlags.TcpNoDelay}
else:
default(set[SocketFlags])
,
default(set[SocketFlags]),
upgrader: upgrade,
networkReachability: NetworkReachability.Unknown,
connectionsTimeout: connectionsTimeout,

View File

@@ -71,23 +71,6 @@ proc capLen*[T](s: var seq[T], length: Natural) =
if s.len > length:
s.setLen(length)
template exceptionToAssert*(body: untyped): untyped =
block:
var res: type(body)
when defined(nimHasWarnBareExcept):
{.push warning[BareExcept]: off.}
try:
res = body
except CatchableError as exc:
raise exc
except Defect as exc:
raise exc
except Exception as exc:
raiseAssert exc.msg
when defined(nimHasWarnBareExcept):
{.pop.}
res
template withValue*[T](self: Opt[T] | Option[T], value, body: untyped): untyped =
## This template provides a convenient way to work with `Option` types in Nim.
## It allows you to execute a block of code (`body`) only when the `Option` is not empty.
@@ -149,3 +132,11 @@ template exclIfIt*[T](set: var HashSet[T], condition: untyped) =
if condition:
toExcl.incl(it)
set.excl(toExcl)
template filterIt*[T](set: HashSet[T], condition: untyped): HashSet[T] =
var filtered = HashSet[T]()
if set.len != 0:
for it {.inject.} in set:
if condition:
filtered.incl(it)
filtered

View File

@@ -13,21 +13,25 @@ import chronos
type AllFuturesFailedError* = object of CatchableError
proc anyCompleted*[T](futs: seq[Future[T]]): Future[Future[T]] {.async.} =
proc anyCompleted*[T](
futs: seq[Future[T]]
): Future[Future[T]] {.async: (raises: [AllFuturesFailedError, CancelledError]).} =
## Returns a future that will complete with the first future that completes.
## If all futures fail or futs is empty, the returned future will fail with AllFuturesFailedError.
var requests = futs
while true:
if requests.len == 0:
var raceFut: Future[T]
try:
raceFut = await one(requests)
if raceFut.completed:
return raceFut
except ValueError:
raise newException(
AllFuturesFailedError, "None of the futures completed successfully"
)
var raceFut = await one(requests)
if raceFut.completed:
return raceFut
let index = requests.find(raceFut)
requests.del(index)

View File

@@ -67,7 +67,9 @@ proc connect*(
child: StreamTransport = nil,
flags = default(set[SocketFlags]),
localAddress: Opt[MultiAddress] = Opt.none(MultiAddress),
): Future[StreamTransport] {.async.} =
): Future[StreamTransport] {.
async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError])
.} =
## Open new connection to remote peer with address ``ma`` and create
## new transport object ``StreamTransport`` for established connection.
## ``bufferSize`` is size of internal buffer for transport.
@@ -86,8 +88,7 @@ proc connect*(
if localAddress.isSome():
initTAddress(localAddress.expect("just checked")).tryGet()
else:
TransportAddress()
,
TransportAddress(),
flags,
)
do:

View File

@@ -9,8 +9,7 @@ type
SwitchCreator = proc(
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
prov: TransportProvider = proc(upgr: Upgrade): Transport =
TcpTransport.new({}, upgr)
,
TcpTransport.new({}, upgr),
relay: Relay = Relay.new(circuitRelayV1 = true),
): Switch {.gcsafe, raises: [LPError].}
DaemonPeerInfo = daemonapi.PeerInfo
@@ -83,7 +82,7 @@ proc testPubSubDaemonPublish(
proc pubsubHandler(
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
): Future[bool] {.async.} =
): Future[bool] {.async: (raises: [CatchableError]).} =
result = true # don't cancel subscription
asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
@@ -137,7 +136,7 @@ proc testPubSubNodePublish(
var finished = false
proc pubsubHandler(
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
): Future[bool] {.async.} =
): Future[bool] {.async: (raises: [CatchableError]).} =
let smsg = string.fromBytes(message.data)
check smsg == pubsubData
times.inc()
@@ -186,7 +185,9 @@ proc commonInteropTests*(name: string, swCreator: SwitchCreator) =
let daemonPeer = await daemonNode.identity()
var testFuture = newFuture[void]("test.future")
proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} =
proc daemonHandler(
api: DaemonAPI, stream: P2PStream
) {.async: (raises: [CatchableError]).} =
check string.fromBytes(await stream.transp.readLp()) == "test 1"
discard await stream.transp.writeLp("test 2")
check string.fromBytes(await stream.transp.readLp()) == "test 3"
@@ -228,7 +229,9 @@ proc commonInteropTests*(name: string, swCreator: SwitchCreator) =
let daemonPeer = await daemonNode.identity()
var testFuture = newFuture[string]("test.future")
proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} =
proc daemonHandler(
api: DaemonAPI, stream: P2PStream
) {.async: (raises: [CatchableError]).} =
# We should perform `readLp()` instead of `readLine()`. `readLine()`
# here reads actually length prefixed string.
var line = await stream.transp.readLine()
@@ -301,8 +304,7 @@ proc commonInteropTests*(name: string, swCreator: SwitchCreator) =
let nativeNode = swCreator(
ma = wsAddress,
prov = proc(upgr: Upgrade): Transport =
WsTransport.new(upgr)
,
WsTransport.new(upgr),
)
nativeNode.mount(proto)
@@ -353,7 +355,9 @@ proc commonInteropTests*(name: string, swCreator: SwitchCreator) =
let daemonPeer = await daemonNode.identity()
var testFuture = newFuture[string]("test.future")
proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} =
proc daemonHandler(
api: DaemonAPI, stream: P2PStream
) {.async: (raises: [CatchableError]).} =
# We should perform `readLp()` instead of `readLine()`. `readLine()`
# here reads actually length prefixed string.
var line = await stream.transp.readLine()
@@ -487,7 +491,9 @@ proc relayInteropTests*(name: string, relayCreator: SwitchCreator) =
# TODO: This Future blocks the daemonHandler after sending the last message.
# It exists because there's a strange behavior where stream.close sends
# a Rst instead of Fin. We should investigate this at some point.
proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} =
proc daemonHandler(
api: DaemonAPI, stream: P2PStream
) {.async: (raises: [CatchableError]).} =
check "line1" == string.fromBytes(await stream.transp.readLp())
discard await stream.transp.writeLp("line2")
check "line3" == string.fromBytes(await stream.transp.readLp())

View File

@@ -36,7 +36,7 @@ suite "GossipSub internal":
asyncTest "subscribe/unsubscribeAll":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(topic: string, data: seq[byte]): Future[void] {.gcsafe.} =
proc handler(topic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} =
discard
let topic = "foobar"
@@ -162,7 +162,7 @@ suite "GossipSub internal":
asyncTest "`replenishFanout` Degree Lo":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
discard
let topic = "foobar"
@@ -189,7 +189,7 @@ suite "GossipSub internal":
asyncTest "`dropFanoutPeers` drop expired fanout topics":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
discard
let topic = "foobar"
@@ -219,7 +219,7 @@ suite "GossipSub internal":
asyncTest "`dropFanoutPeers` leave unexpired fanout topics":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
discard
let topic1 = "foobar1"
@@ -256,7 +256,7 @@ suite "GossipSub internal":
asyncTest "`getGossipPeers` - should gather up to degree D non intersecting peers":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
discard
let topic = "foobar"
@@ -317,7 +317,7 @@ suite "GossipSub internal":
asyncTest "`getGossipPeers` - should not crash on missing topics in mesh":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
discard
let topic = "foobar"
@@ -357,7 +357,7 @@ suite "GossipSub internal":
asyncTest "`getGossipPeers` - should not crash on missing topics in fanout":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
discard
let topic = "foobar"
@@ -398,7 +398,7 @@ suite "GossipSub internal":
asyncTest "`getGossipPeers` - should not crash on missing topics in gossip":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
discard
let topic = "foobar"
@@ -439,7 +439,7 @@ suite "GossipSub internal":
asyncTest "Drop messages of topics without subscription":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
check false
let topic = "foobar"
@@ -473,7 +473,7 @@ suite "GossipSub internal":
let gossipSub = TestGossipSub.init(newStandardSwitch())
gossipSub.parameters.disconnectBadPeers = true
gossipSub.parameters.appSpecificWeight = 1.0
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
check false
let topic = "foobar"
@@ -665,7 +665,7 @@ suite "GossipSub internal":
asyncTest "handleIHave/Iwant tests":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
check false
proc handler2(topic: string, data: seq[byte]) {.async.} =

View File

@@ -223,7 +223,8 @@ suite "GossipSub":
asyncTest "GossipSub's observers should run after message is sent, received and validated":
var
recvCounter = 0
sendCounter = 0
afterSentCounter = 0
beforeSendCounter = 0
validatedCounter = 0
proc handler(topic: string, data: seq[byte]) {.async.} =
@@ -232,13 +233,16 @@ suite "GossipSub":
proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
inc recvCounter
proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
inc sendCounter
proc onBeforeSend(peer: PubSubPeer, msgs: var RPCMsg) =
inc beforeSendCounter
proc onAfterSent(peer: PubSubPeer, msgs: RPCMsg) =
inc afterSentCounter
proc onValidated(peer: PubSubPeer, msg: Message, msgId: MessageId) =
inc validatedCounter
let obs0 = PubSubObserver(onSend: onSend)
let obs0 = PubSubObserver(onBeforeSend: onBeforeSend, onAfterSent: onAfterSent)
let obs1 = PubSubObserver(onRecv: onRecv, onValidated: onValidated)
let nodes = generateNodes(2, gossip = true)
@@ -265,7 +269,8 @@ suite "GossipSub":
check:
recvCounter == 1
validatedCounter == 1
sendCounter == 1
afterSentCounter == 1
beforeSendCounter == 1
# Send message that will be rejected by the receiver's validator
tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1
@@ -273,7 +278,8 @@ suite "GossipSub":
check:
recvCounter == 2
validatedCounter == 1
sendCounter == 2
afterSentCounter == 2
beforeSendCounter == 2
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
@@ -417,12 +423,17 @@ suite "GossipSub":
inc observed
)
obs2 = PubSubObserver(
onSend: proc(peer: PubSubPeer, msgs: var RPCMsg) =
onBeforeSend: proc(peer: PubSubPeer, msgs: var RPCMsg) =
inc observed
)
obs3 = PubSubObserver(
onAfterSent: proc(peer: PubSubPeer, msgs: RPCMsg) =
inc observed
)
nodes[1].addObserver(obs1)
nodes[0].addObserver(obs2)
nodes[0].addObserver(obs3)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
@@ -439,7 +450,7 @@ suite "GossipSub":
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())
check observed == 2
check observed == 3
asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic":
var passed = newFuture[void]()
@@ -566,19 +577,22 @@ suite "GossipSub":
proc slowValidator(
topic: string, message: Message
): Future[ValidationResult] {.async.} =
await cRelayed
# Empty A & C caches to detect duplicates
gossip1.seen = TimedCache[SaltedId].init()
gossip3.seen = TimedCache[SaltedId].init()
let msgId = toSeq(gossip2.validationSeen.keys)[0]
checkUntilTimeout(
try:
gossip2.validationSeen[msgId].len > 0
except:
false
)
result = ValidationResult.Accept
bFinished.complete()
try:
await cRelayed
# Empty A & C caches to detect duplicates
gossip1.seen = TimedCache[SaltedId].init()
gossip3.seen = TimedCache[SaltedId].init()
let msgId = toSeq(gossip2.validationSeen.keys)[0]
checkUntilTimeout(
try:
gossip2.validationSeen[msgId].len > 0
except:
false
)
result = ValidationResult.Accept
bFinished.complete()
except CatchableError:
raiseAssert "err on slowValidator"
nodes[1].addValidator("foobar", slowValidator)
@@ -722,9 +736,7 @@ suite "GossipSub":
closureScope:
var peerName = $dialer.peerInfo.peerId
handler = proc(topic: string, data: seq[byte]) {.async, closure.} =
if peerName notin seen:
seen[peerName] = 0
seen[peerName].inc
seen.mgetOrPut(peerName, 0).inc()
check topic == "foobar"
if not seenFut.finished() and seen.len >= runs:
seenFut.complete()
@@ -772,9 +784,12 @@ suite "GossipSub":
capture dialer, i:
var peerName = $dialer.peerInfo.peerId
handler = proc(topic: string, data: seq[byte]) {.async, closure.} =
if peerName notin seen:
seen[peerName] = 0
seen[peerName].inc
try:
if peerName notin seen:
seen[peerName] = 0
seen[peerName].inc
except KeyError:
raiseAssert "seen checked before"
check topic == "foobar"
if not seenFut.finished() and seen.len >= runs:
seenFut.complete()
@@ -928,6 +943,44 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - iDontWant is broadcasted on publish":
func dumbMsgIdProvider(m: Message): Result[MessageId, ValidationResult] =
ok(newSeq[byte](10))
let
nodes = generateNodes(
2,
gossip = true,
msgIdProvider = dumbMsgIdProvider,
sendIDontWantOnPublish = true,
)
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
await nodes[0].switch.connect(
nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs
)
proc handlerA(topic: string, data: seq[byte]) {.async.} =
discard
proc handlerB(topic: string, data: seq[byte]) {.async.} =
discard
nodes[0].subscribe("foobar", handlerA)
nodes[1].subscribe("foobar", handlerB)
await waitSubGraph(nodes, "foobar")
var gossip2: GossipSub = GossipSub(nodes[1])
tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1
checkUntilTimeout:
gossip2.mesh.getOrDefault("foobar").anyIt(it.iDontWants[^1].len == 1)
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - iDontWant is sent only for 1.2":
# 3 nodes: A <=> B <=> C
# (A & C are NOT connected). We pre-emptively send a dontwant from C to B,
@@ -1131,7 +1184,7 @@ suite "GossipSub":
let topic = "foobar"
proc execValidator(
topic: string, message: messages.Message
): Future[ValidationResult] {.raises: [].} =
): Future[ValidationResult] {.async: (raw: true).} =
let res = newFuture[ValidationResult]()
res.complete(ValidationResult.Reject)
res

View File

@@ -66,9 +66,7 @@ suite "GossipSub":
closureScope:
var peerName = $dialer.peerInfo.peerId
handler = proc(topic: string, data: seq[byte]) {.async, closure.} =
if peerName notin seen:
seen[peerName] = 0
seen[peerName].inc
seen.mgetOrPut(peerName, 0).inc()
info "seen up", count = seen.len
check topic == "foobar"
if not seenFut.finished() and seen.len >= runs:
@@ -275,9 +273,7 @@ suite "GossipSub":
closureScope:
var peerName = $dialer.peerInfo.peerId
handler = proc(topic: string, data: seq[byte]) {.async, closure.} =
if peerName notin seen:
seen[peerName] = 0
seen[peerName].inc
seen.mgetOrPut(peerName, 0).inc()
check topic == "foobar"
if not seenFut.finished() and seen.len >= runs:
seenFut.complete()

View File

@@ -27,8 +27,11 @@ randomize()
type TestGossipSub* = ref object of GossipSub
proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer =
proc getConn(): Future[Connection] =
p.switch.dial(peerId, GossipSubCodec_12)
proc getConn(): Future[Connection] {.async: (raises: [GetConnDialError]).} =
try:
return await p.switch.dial(peerId, GossipSubCodec_12)
except CatchableError as e:
raise (ref GetConnDialError)(parent: e)
let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec_12, 1024 * 1024)
debug "created new pubsub peer", peerId
@@ -71,6 +74,7 @@ proc generateNodes*(
overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] =
Opt.none(tuple[bytes: int, interval: Duration]),
gossipSubVersion: string = "",
sendIDontWantOnPublish: bool = false,
): seq[PubSub] =
for i in 0 ..< num:
let switch = newStandardSwitch(
@@ -94,6 +98,7 @@ proc generateNodes*(
p.unsubscribeBackoff = unsubscribeBackoff
p.enablePX = enablePX
p.overheadRateLimit = overheadRateLimit
p.sendIDontWantOnPublish = sendIDontWantOnPublish
p
),
)

View File

@@ -28,7 +28,9 @@ proc connectStreamTest(): Future[bool] {.async.} =
var testFuture = newFuture[string]("test.future")
proc streamHandler(api: DaemonAPI, stream: P2PStream) {.async.} =
proc streamHandler(
api: DaemonAPI, stream: P2PStream
) {.async: (raises: [CatchableError]).} =
var line = await stream.transp.readLine()
testFuture.complete(line)
@@ -62,7 +64,7 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
proc pubsubHandler1(
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
): Future[bool] {.async.} =
): Future[bool] {.async: (raises: [CatchableError]).} =
let smsg = cast[string](message.data)
if smsg == pubsubData:
inc(resultsCount)
@@ -72,7 +74,7 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
proc pubsubHandler2(
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
): Future[bool] {.async.} =
): Future[bool] {.async: (raises: [CatchableError]).} =
let smsg = cast[string](message.data)
if smsg == pubsubData:
inc(resultsCount)

View File

@@ -5,8 +5,7 @@ import ../libp2p/crypto/crypto, ../libp2p/protocols/connectivity/relay/[relay, c
proc switchMplexCreator(
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
prov: TransportProvider = proc(upgr: Upgrade): Transport =
TcpTransport.new({}, upgr)
,
TcpTransport.new({}, upgr),
relay: Relay = Relay.new(circuitRelayV1 = true),
): Switch {.raises: [LPError].} =
SwitchBuilder
@@ -29,8 +28,7 @@ proc switchMplexCreator(
proc switchYamuxCreator(
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
prov: TransportProvider = proc(upgr: Upgrade): Transport =
TcpTransport.new({}, upgr)
,
TcpTransport.new({}, upgr),
relay: Relay = Relay.new(circuitRelayV1 = true),
): Switch {.raises: [LPError].} =
SwitchBuilder

View File

@@ -11,7 +11,8 @@
import unittest2
import ../libp2p/protobuf/minprotobuf
import stew/byteutils, strutils
import ../libp2p/varint
import stew/byteutils, strutils, sequtils
suite "MinProtobuf test suite":
const VarintVectors = [
@@ -615,29 +616,25 @@ suite "MinProtobuf test suite":
res.get() == true
value == "SOME"
test "[length] too big message test":
var pb1 = initProtoBuffer()
var bigString = newString(pb1.maxSize + 1)
test "[length] invalid message length":
let field = @[0x0a'u8]
let data = @[1'u8, 2'u8, 3'u8]
for i in 0 ..< len(bigString):
bigString[i] = 'A'
pb1.write(1, bigString)
pb1.finish()
block:
var pb2 = initProtoBuffer(pb1.buffer)
var value = newString(pb1.maxSize + 1)
var valueLen = 0
let res = pb2.getField(1, value, valueLen)
check:
res.isErr() == true
var value: string
var valueLen = 0
block:
var pb2 = initProtoBuffer(pb1.buffer, maxSize = uint.high)
var value = newString(pb1.maxSize + 1)
var valueLen = 0
let res = pb2.getField(1, value, valueLen)
check:
res.isErr() == false
let invalidLength = LP.encodeVarint(uint64(int32.high) + 1'u64)
var pb1 = initProtoBuffer(field & invalidLength.get() & data)
let res1 = pb1.getField(1, value, valueLen)
check:
res1.isErr() == true
res1.error() == MessageIncomplete
let validLength = LP.encodeVarint(data.lenu64)
var pb2 = initProtoBuffer(field & validLength.get() & data)
let res2 = pb2.getField(1, value, valueLen)
check:
res2.isErr() == false
test "[length] Repeated field test":
var pb1 = initProtoBuffer()

View File

@@ -15,7 +15,7 @@ import std/sequtils
import unittest2
import stew/byteutils
import ../libp2p/[multicodec, multiaddress]
import ../libp2p/[multicodec, multiaddress, protobuf/minprotobuf]
type PatternVector = object
pattern: MaPattern

View File

@@ -19,6 +19,7 @@ import
protocols/connectivity/relay/utils,
protocols/connectivity/relay/rtransport,
multiaddress,
protobuf/minprotobuf,
peerinfo,
peerid,
stream/connection,

View File

@@ -842,6 +842,8 @@ suite "Switch":
switch1.peerStore[AddressBook][switch2.peerInfo.peerId] == switch2.peerInfo.addrs
switch1.peerStore[ProtoBook][switch2.peerInfo.peerId] == switch2.peerInfo.protocols
switch1.peerStore[LastSeenBook][switch2.peerInfo.peerId].isSome()
switch1.peerInfo.peerId notin switch2.peerStore[AddressBook]
switch1.peerInfo.peerId notin switch2.peerStore[ProtoBook]