mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-09 14:28:11 -05:00
Compare commits
8 Commits
chore/rela
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5f6b8e86a5 | ||
|
|
11b98b7a3f | ||
|
|
647f76341e | ||
|
|
fbf96bb2ce | ||
|
|
f0aaecb743 | ||
|
|
8d3076ea99 | ||
|
|
70b7d61436 | ||
|
|
37bae0986c |
9
.github/workflows/ci.yml
vendored
9
.github/workflows/ci.yml
vendored
@@ -72,15 +72,6 @@ jobs:
|
||||
shell: ${{ matrix.shell }}
|
||||
nim_ref: ${{ matrix.nim.ref }}
|
||||
|
||||
- name: Setup Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: '~1.16.0' # That's the minimum Go version that works with arm.
|
||||
|
||||
- name: Install p2pd
|
||||
run: |
|
||||
V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3
|
||||
|
||||
- name: Restore deps from cache
|
||||
id: deps-cache
|
||||
uses: actions/cache@v3
|
||||
|
||||
10
.github/workflows/daily_common.yml
vendored
10
.github/workflows/daily_common.yml
vendored
@@ -69,16 +69,6 @@ jobs:
|
||||
nim_ref: ${{ matrix.nim.ref }}
|
||||
cpu: ${{ matrix.cpu }}
|
||||
|
||||
- name: Setup Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: '~1.16.0'
|
||||
cache: false
|
||||
|
||||
- name: Install p2pd
|
||||
run: |
|
||||
V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3
|
||||
|
||||
- name: Install dependencies (pinned)
|
||||
if: ${{ inputs.pinned_deps }}
|
||||
run: |
|
||||
|
||||
10
README.md
10
README.md
@@ -47,7 +47,7 @@ nimble install libp2p
|
||||
You'll find the nim-libp2p documentation [here](https://vacp2p.github.io/nim-libp2p/docs/). See [examples](./examples) for simple usage patterns.
|
||||
|
||||
## Getting Started
|
||||
Try out the chat example. For this you'll need to have [`go-libp2p-daemon`](examples/go-daemon/daemonapi.md) running. Full code can be found [here](https://github.com/status-im/nim-libp2p/blob/master/examples/chat.nim):
|
||||
Try out the chat example. Full code can be found [here](https://github.com/vacp2p/nim-libp2p/blob/master/examples/directchat.nim):
|
||||
|
||||
```bash
|
||||
nim c -r --threads:on examples/directchat.nim
|
||||
@@ -81,12 +81,6 @@ Run unit tests:
|
||||
# run all the unit tests
|
||||
nimble test
|
||||
```
|
||||
**Obs:** Running all tests requires the [`go-libp2p-daemon` to be installed and running](examples/go-daemon/daemonapi.md).
|
||||
|
||||
If you only want to run tests that don't require `go-libp2p-daemon`, use:
|
||||
```
|
||||
nimble testnative
|
||||
```
|
||||
|
||||
For a list of all available test suites, use:
|
||||
```
|
||||
@@ -155,8 +149,6 @@ List of packages modules implemented in nim-libp2p:
|
||||
| [connmanager](libp2p/connmanager.nim) | Connection manager |
|
||||
| [identify / push identify](libp2p/protocols/identify.nim) | [Identify](https://docs.libp2p.io/concepts/fundamentals/protocols/#identify) protocol |
|
||||
| [ping](libp2p/protocols/ping.nim) | [Ping](https://docs.libp2p.io/concepts/fundamentals/protocols/#ping) protocol |
|
||||
| [libp2p-daemon-client](libp2p/daemon/daemonapi.nim) | [go-daemon](https://github.com/libp2p/go-libp2p-daemon) nim wrapper |
|
||||
| [interop-libp2p](tests/testinterop.nim) | Interop tests |
|
||||
| **Transports** | |
|
||||
| [libp2p-tcp](libp2p/transports/tcptransport.nim) | TCP transport |
|
||||
| [libp2p-ws](libp2p/transports/wstransport.nim) | WebSocket & WebSocket Secure transport |
|
||||
|
||||
@@ -1,54 +0,0 @@
|
||||
import chronos, nimcrypto, strutils
|
||||
import ../../libp2p/daemon/daemonapi
|
||||
import ../hexdump
|
||||
|
||||
const PubSubTopic = "test-net"
|
||||
|
||||
proc dumpSubscribedPeers(api: DaemonAPI) {.async.} =
|
||||
var peers = await api.pubsubListPeers(PubSubTopic)
|
||||
echo "= List of connected and subscribed peers:"
|
||||
for item in peers:
|
||||
echo item.pretty()
|
||||
|
||||
proc dumpAllPeers(api: DaemonAPI) {.async.} =
|
||||
var peers = await api.listPeers()
|
||||
echo "Current connected peers count = ", len(peers)
|
||||
for item in peers:
|
||||
echo item.peer.pretty()
|
||||
|
||||
proc monitor(api: DaemonAPI) {.async.} =
|
||||
while true:
|
||||
echo "Dumping all peers"
|
||||
await dumpAllPeers(api)
|
||||
await sleepAsync(5000)
|
||||
|
||||
proc main() {.async.} =
|
||||
echo "= Starting P2P bootnode"
|
||||
var api = await newDaemonApi({DHTFull, PSGossipSub})
|
||||
var id = await api.identity()
|
||||
echo "= P2P bootnode ", id.peer.pretty(), " started."
|
||||
let mcip4 = multiCodec("ip4")
|
||||
let mcip6 = multiCodec("ip6")
|
||||
echo "= You can use one of this addresses to bootstrap your nodes:"
|
||||
for item in id.addresses:
|
||||
if item.protoCode() == mcip4 or item.protoCode() == mcip6:
|
||||
echo $item & "/ipfs/" & id.peer.pretty()
|
||||
|
||||
asyncSpawn monitor(api)
|
||||
|
||||
proc pubsubLogger(
|
||||
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
|
||||
): Future[bool] {.async.} =
|
||||
let msglen = len(message.data)
|
||||
echo "= Recieved pubsub message with length ",
|
||||
msglen, " bytes from peer ", message.peer.pretty()
|
||||
echo dumpHex(message.data)
|
||||
await api.dumpSubscribedPeers()
|
||||
result = true
|
||||
|
||||
var ticket = await api.pubsubSubscribe(PubSubTopic, pubsubLogger)
|
||||
|
||||
when isMainModule:
|
||||
waitFor(main())
|
||||
while true:
|
||||
poll()
|
||||
@@ -1,132 +0,0 @@
|
||||
import chronos, nimcrypto, strutils
|
||||
import ../../libp2p/daemon/daemonapi
|
||||
|
||||
## nim c -r --threads:on chat.nim
|
||||
when not (compileOption("threads")):
|
||||
{.fatal: "Please, compile this program with the --threads:on option!".}
|
||||
|
||||
const ServerProtocols = @["/test-chat-stream"]
|
||||
|
||||
type CustomData = ref object
|
||||
api: DaemonAPI
|
||||
remotes: seq[StreamTransport]
|
||||
consoleFd: AsyncFD
|
||||
serveFut: Future[void]
|
||||
|
||||
proc threadMain(wfd: AsyncFD) {.thread.} =
|
||||
## This procedure performs reading from `stdin` and sends data over
|
||||
## pipe to main thread.
|
||||
var transp = fromPipe(wfd)
|
||||
|
||||
while true:
|
||||
var line = stdin.readLine()
|
||||
let res = waitFor transp.write(line & "\r\n")
|
||||
|
||||
proc serveThread(udata: CustomData) {.async.} =
|
||||
## This procedure perform reading on pipe and sends data to remote clients.
|
||||
var transp = fromPipe(udata.consoleFd)
|
||||
|
||||
proc remoteReader(transp: StreamTransport) {.async.} =
|
||||
while true:
|
||||
var line = await transp.readLine()
|
||||
if len(line) == 0:
|
||||
break
|
||||
echo ">> ", line
|
||||
|
||||
while true:
|
||||
try:
|
||||
var line = await transp.readLine()
|
||||
if line.startsWith("/connect"):
|
||||
var parts = line.split(" ")
|
||||
if len(parts) == 2:
|
||||
var peerId = PeerId.init(parts[1])
|
||||
var address = MultiAddress.init(multiCodec("p2p-circuit"))
|
||||
address &= MultiAddress.init(multiCodec("p2p"), peerId)
|
||||
echo "= Searching for peer ", peerId.pretty()
|
||||
var id = await udata.api.dhtFindPeer(peerId)
|
||||
echo "= Peer " & parts[1] & " found at addresses:"
|
||||
for item in id.addresses:
|
||||
echo $item
|
||||
echo "= Connecting to peer ", $address
|
||||
await udata.api.connect(peerId, @[address], 30)
|
||||
echo "= Opening stream to peer chat ", parts[1]
|
||||
var stream = await udata.api.openStream(peerId, ServerProtocols)
|
||||
udata.remotes.add(stream.transp)
|
||||
echo "= Connected to peer chat ", parts[1]
|
||||
asyncSpawn remoteReader(stream.transp)
|
||||
elif line.startsWith("/search"):
|
||||
var parts = line.split(" ")
|
||||
if len(parts) == 2:
|
||||
var peerId = PeerId.init(parts[1])
|
||||
echo "= Searching for peer ", peerId.pretty()
|
||||
var id = await udata.api.dhtFindPeer(peerId)
|
||||
echo "= Peer " & parts[1] & " found at addresses:"
|
||||
for item in id.addresses:
|
||||
echo $item
|
||||
elif line.startsWith("/consearch"):
|
||||
var parts = line.split(" ")
|
||||
if len(parts) == 2:
|
||||
var peerId = PeerId.init(parts[1])
|
||||
echo "= Searching for peers connected to peer ", parts[1]
|
||||
var peers = await udata.api.dhtFindPeersConnectedToPeer(peerId)
|
||||
echo "= Found ", len(peers), " connected to peer ", parts[1]
|
||||
for item in peers:
|
||||
var peer = item.peer
|
||||
var addresses = newSeq[string]()
|
||||
var relay = false
|
||||
for a in item.addresses:
|
||||
addresses.add($a)
|
||||
if a.protoName() == "/p2p-circuit":
|
||||
relay = true
|
||||
break
|
||||
if relay:
|
||||
echo peer.pretty(), " * ", " [", addresses.join(", "), "]"
|
||||
else:
|
||||
echo peer.pretty(), " [", addresses.join(", "), "]"
|
||||
elif line.startsWith("/exit"):
|
||||
break
|
||||
else:
|
||||
var msg = line & "\r\n"
|
||||
echo "<< ", line
|
||||
var pending = newSeq[Future[int]]()
|
||||
for item in udata.remotes:
|
||||
pending.add(item.write(msg))
|
||||
if len(pending) > 0:
|
||||
var results = await all(pending)
|
||||
except CatchableError as err:
|
||||
echo err.msg
|
||||
|
||||
proc main() {.async.} =
|
||||
var data = new CustomData
|
||||
data.remotes = newSeq[StreamTransport]()
|
||||
|
||||
var (rfd, wfd) = createAsyncPipe()
|
||||
if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe:
|
||||
raise newException(ValueError, "Could not initialize pipe!")
|
||||
|
||||
data.consoleFd = rfd
|
||||
|
||||
data.serveFut = serveThread(data)
|
||||
var thread: Thread[AsyncFD]
|
||||
thread.createThread(threadMain, wfd)
|
||||
|
||||
echo "= Starting P2P node"
|
||||
data.api = await newDaemonApi({DHTFull, Bootstrap})
|
||||
await sleepAsync(3000)
|
||||
var id = await data.api.identity()
|
||||
|
||||
proc streamHandler(api: DaemonAPI, stream: P2PStream) {.async.} =
|
||||
echo "= Peer ", stream.peer.pretty(), " joined chat"
|
||||
data.remotes.add(stream.transp)
|
||||
while true:
|
||||
var line = await stream.transp.readLine()
|
||||
if len(line) == 0:
|
||||
break
|
||||
echo ">> ", line
|
||||
|
||||
await data.api.addHandler(ServerProtocols, streamHandler)
|
||||
echo "= Your PeerId is ", id.peer.pretty()
|
||||
await data.serveFut
|
||||
|
||||
when isMainModule:
|
||||
waitFor(main())
|
||||
@@ -1,43 +0,0 @@
|
||||
# Table of Contents
|
||||
- [Introduction](#introduction)
|
||||
- [Prerequisites](#prerequisites)
|
||||
- [Installation](#installation)
|
||||
- [Script](#script)
|
||||
- [Examples](#examples)
|
||||
|
||||
# Introduction
|
||||
This is a libp2p-backed daemon wrapping the functionalities of go-libp2p for use in Nim. <br>
|
||||
For more information about the go daemon, check out [this repository](https://github.com/libp2p/go-libp2p-daemon).
|
||||
> **Required only** for running the tests.
|
||||
|
||||
# Prerequisites
|
||||
Go with version `1.16.0`
|
||||
> You will *likely* be able to build `go-libp2p-daemon` with different Go versions, but **they haven't been tested**.
|
||||
|
||||
# Installation
|
||||
Run the build script while having the `go` command pointing to the correct Go version.
|
||||
```sh
|
||||
./scripts/build_p2pd.sh
|
||||
```
|
||||
`build_p2pd.sh` will not rebuild unless needed. If you already have the newest binary and you want to force the rebuild, use:
|
||||
```sh
|
||||
./scripts/build_p2pd.sh -f
|
||||
```
|
||||
Or:
|
||||
```sh
|
||||
./scripts/build_p2pd.sh --force
|
||||
```
|
||||
|
||||
If everything goes correctly, the binary (`p2pd`) should be built and placed in the `$GOPATH/bin` directory.
|
||||
If you're having issues, head into [our discord](https://discord.com/channels/864066763682218004/1115526869769535629) and ask for assistance.
|
||||
|
||||
After successfully building the binary, remember to add it to your path so it can be found. You can do that by running:
|
||||
```sh
|
||||
export PATH="$PATH:$HOME/go/bin"
|
||||
```
|
||||
> **Tip:** To make this change permanent, add the command above to your `.bashrc` file.
|
||||
|
||||
# Examples
|
||||
Examples can be found in the [examples folder](https://github.com/status-im/nim-libp2p/tree/readme/examples/go-daemon)
|
||||
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
import chronos, nimcrypto, strutils, os
|
||||
import ../../libp2p/daemon/daemonapi
|
||||
|
||||
const PubSubTopic = "test-net"
|
||||
|
||||
proc main(bn: string) {.async.} =
|
||||
echo "= Starting P2P node"
|
||||
var bootnodes = bn.split(",")
|
||||
var api = await newDaemonApi(
|
||||
{DHTFull, PSGossipSub, WaitBootstrap}, bootstrapNodes = bootnodes, peersRequired = 1
|
||||
)
|
||||
var id = await api.identity()
|
||||
echo "= P2P node ", id.peer.pretty(), " started:"
|
||||
for item in id.addresses:
|
||||
echo item
|
||||
|
||||
proc pubsubLogger(
|
||||
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
|
||||
): Future[bool] {.async.} =
|
||||
let msglen = len(message.data)
|
||||
echo "= Recieved pubsub message with length ",
|
||||
msglen, " bytes from peer ", message.peer.pretty(), ": "
|
||||
var strdata = cast[string](message.data)
|
||||
echo strdata
|
||||
result = true
|
||||
|
||||
var ticket = await api.pubsubSubscribe(PubSubTopic, pubsubLogger)
|
||||
|
||||
# Waiting for gossipsub interval
|
||||
while true:
|
||||
var peers = await api.pubsubListPeers(PubSubTopic)
|
||||
if len(peers) > 0:
|
||||
break
|
||||
await sleepAsync(1000)
|
||||
|
||||
var data = "HELLO\r\n"
|
||||
var msgData = cast[seq[byte]](data)
|
||||
await api.pubsubPublish(PubSubTopic, msgData)
|
||||
|
||||
when isMainModule:
|
||||
if paramCount() != 1:
|
||||
echo "Please supply bootnodes!"
|
||||
else:
|
||||
waitFor(main(paramStr(1)))
|
||||
while true:
|
||||
poll()
|
||||
@@ -49,12 +49,6 @@ proc tutorialToMd(filename: string) =
|
||||
task testnative, "Runs libp2p native tests":
|
||||
runTest("testnative")
|
||||
|
||||
task testdaemon, "Runs daemon tests":
|
||||
runTest("testdaemon")
|
||||
|
||||
task testinterop, "Runs interop tests":
|
||||
runTest("testinterop")
|
||||
|
||||
task testpubsub, "Runs pubsub tests":
|
||||
runTest("pubsub/testpubsub", "-d:libp2p_gossipsub_1_4")
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,156 +0,0 @@
|
||||
# Nim-Libp2p
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
## This module implements Pool of StreamTransport.
|
||||
import chronos
|
||||
|
||||
const DefaultPoolSize* = 8 ## Default pool size
|
||||
|
||||
type
|
||||
ConnectionFlags = enum
|
||||
None
|
||||
Busy
|
||||
|
||||
PoolItem = object
|
||||
transp*: StreamTransport
|
||||
flags*: set[ConnectionFlags]
|
||||
|
||||
PoolState = enum
|
||||
Connecting
|
||||
Connected
|
||||
Closing
|
||||
Closed
|
||||
|
||||
TransportPool* = ref object ## Transports pool object
|
||||
transports: seq[PoolItem]
|
||||
busyCount: int
|
||||
state: PoolState
|
||||
bufferSize: int
|
||||
event: AsyncEvent
|
||||
|
||||
TransportPoolError* = object of AsyncError
|
||||
|
||||
proc waitAll[T](futs: seq[Future[T]]): Future[void] =
|
||||
## Performs waiting for all Future[T].
|
||||
var counter = len(futs)
|
||||
var retFuture = newFuture[void]("connpool.waitAllConnections")
|
||||
proc cb(udata: pointer) =
|
||||
dec(counter)
|
||||
if counter == 0:
|
||||
retFuture.complete()
|
||||
|
||||
for fut in futs:
|
||||
fut.addCallback(cb)
|
||||
return retFuture
|
||||
|
||||
proc newPool*(
|
||||
address: TransportAddress,
|
||||
poolsize: int = DefaultPoolSize,
|
||||
bufferSize = DefaultStreamBufferSize,
|
||||
): Future[TransportPool] {.async: (raises: [CancelledError]).} =
|
||||
## Establish pool of connections to address ``address`` with size
|
||||
## ``poolsize``.
|
||||
var pool = new TransportPool
|
||||
pool.bufferSize = bufferSize
|
||||
pool.transports = newSeq[PoolItem](poolsize)
|
||||
var conns = newSeq[Future[StreamTransport]](poolsize)
|
||||
pool.state = Connecting
|
||||
for i in 0 ..< poolsize:
|
||||
conns[i] = connect(address, bufferSize)
|
||||
# Waiting for all connections to be established.
|
||||
await waitAll(conns)
|
||||
# Checking connections and preparing pool.
|
||||
for i in 0 ..< poolsize:
|
||||
if conns[i].failed:
|
||||
raise conns[i].error
|
||||
else:
|
||||
let transp = conns[i].read()
|
||||
let item = PoolItem(transp: transp)
|
||||
pool.transports[i] = item
|
||||
# Setup available connections event
|
||||
pool.event = newAsyncEvent()
|
||||
pool.state = Connected
|
||||
result = pool
|
||||
|
||||
proc acquire*(
|
||||
pool: TransportPool
|
||||
): Future[StreamTransport] {.async: (raises: [CancelledError, TransportPoolError]).} =
|
||||
## Acquire non-busy connection from pool ``pool``.
|
||||
var transp: StreamTransport
|
||||
if pool.state in {Connected}:
|
||||
while true:
|
||||
if pool.busyCount < len(pool.transports):
|
||||
for conn in pool.transports.mitems():
|
||||
if Busy notin conn.flags:
|
||||
conn.flags.incl(Busy)
|
||||
inc(pool.busyCount)
|
||||
transp = conn.transp
|
||||
break
|
||||
else:
|
||||
await pool.event.wait()
|
||||
pool.event.clear()
|
||||
|
||||
if not isNil(transp):
|
||||
break
|
||||
else:
|
||||
raise newException(TransportPoolError, "Pool is not ready!")
|
||||
result = transp
|
||||
|
||||
proc release*(
|
||||
pool: TransportPool, transp: StreamTransport
|
||||
) {.async: (raises: [TransportPoolError]).} =
|
||||
## Release connection ``transp`` back to pool ``pool``.
|
||||
if pool.state in {Connected, Closing}:
|
||||
var found = false
|
||||
for conn in pool.transports.mitems():
|
||||
if conn.transp == transp:
|
||||
conn.flags.excl(Busy)
|
||||
dec(pool.busyCount)
|
||||
pool.event.fire()
|
||||
found = true
|
||||
break
|
||||
if not found:
|
||||
raise newException(TransportPoolError, "Transport not bound to pool!")
|
||||
else:
|
||||
raise newException(TransportPoolError, "Pool is not ready!")
|
||||
|
||||
proc join*(
|
||||
pool: TransportPool
|
||||
) {.async: (raises: [TransportPoolError, CancelledError]).} =
|
||||
## Waiting for all connection to become available.
|
||||
if pool.state in {Connected, Closing}:
|
||||
while true:
|
||||
if pool.busyCount == 0:
|
||||
break
|
||||
else:
|
||||
await pool.event.wait()
|
||||
pool.event.clear()
|
||||
elif pool.state == Connecting:
|
||||
raise newException(TransportPoolError, "Pool is not ready!")
|
||||
|
||||
proc close*(
|
||||
pool: TransportPool
|
||||
) {.async: (raises: [TransportPoolError, CancelledError]).} =
|
||||
## Closes transports pool ``pool`` and release all resources.
|
||||
if pool.state == Connected:
|
||||
pool.state = Closing
|
||||
# Waiting for all transports to become available.
|
||||
await pool.join()
|
||||
# Closing all transports
|
||||
var pending = newSeq[Future[void]](len(pool.transports))
|
||||
for i in 0 ..< len(pool.transports):
|
||||
let transp = pool.transports[i].transp
|
||||
transp.close()
|
||||
pending[i] = transp.join()
|
||||
# Waiting for all transports to be closed
|
||||
await waitAll(pending)
|
||||
# Mark pool as `Closed`.
|
||||
pool.state = Closed
|
||||
@@ -93,6 +93,10 @@ proc parseFullAddress*(ma: MultiAddress): MaResult[(PeerId, MultiAddress)] =
|
||||
proc parseFullAddress*(ma: string | seq[byte]): MaResult[(PeerId, MultiAddress)] =
|
||||
parseFullAddress(?MultiAddress.init(ma))
|
||||
|
||||
proc toFullAddress*(peerId: PeerId, ma: MultiAddress): MaResult[MultiAddress] =
|
||||
let peerIdPart = ?MultiAddress.init(multiCodec("p2p"), peerId.data)
|
||||
concat(ma, peerIdPart)
|
||||
|
||||
proc new*(
|
||||
p: typedesc[PeerInfo],
|
||||
key: PrivateKey,
|
||||
|
||||
133
libp2p/protocols/mix/entry_connection.nim
Normal file
133
libp2p/protocols/mix/entry_connection.nim
Normal file
@@ -0,0 +1,133 @@
|
||||
import hashes, chronos, stew/byteutils, results, chronicles
|
||||
import ../../stream/connection
|
||||
import ../../varint
|
||||
import ../../utils/sequninit
|
||||
import ./mix_protocol
|
||||
from fragmentation import DataSize
|
||||
|
||||
type MixDialer* = proc(
|
||||
msg: seq[byte], codec: string, destination: MixDestination
|
||||
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true).}
|
||||
|
||||
type MixEntryConnection* = ref object of Connection
|
||||
destination: MixDestination
|
||||
codec: string
|
||||
mixDialer: MixDialer
|
||||
|
||||
func shortLog*(conn: MixEntryConnection): string =
|
||||
if conn == nil:
|
||||
"MixEntryConnection(nil)"
|
||||
else:
|
||||
"MixEntryConnection(" & $conn.destination & ")"
|
||||
|
||||
chronicles.formatIt(MixEntryConnection):
|
||||
shortLog(it)
|
||||
|
||||
method readOnce*(
|
||||
s: MixEntryConnection, pbytes: pointer, nbytes: int
|
||||
): Future[int] {.async: (raises: [CancelledError, LPStreamError]), public.} =
|
||||
# TODO: implement
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
method readExactly*(
|
||||
s: MixEntryConnection, pbytes: pointer, nbytes: int
|
||||
): Future[void] {.async: (raises: [CancelledError, LPStreamError]), public.} =
|
||||
# TODO: implement
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
method readLine*(
|
||||
s: MixEntryConnection, limit = 0, sep = "\r\n"
|
||||
): Future[string] {.async: (raises: [CancelledError, LPStreamError]), public.} =
|
||||
# TODO: implement
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
method readVarint*(
|
||||
conn: MixEntryConnection
|
||||
): Future[uint64] {.async: (raises: [CancelledError, LPStreamError]), public.} =
|
||||
# TODO: implement
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
method readLp*(
|
||||
s: MixEntryConnection, maxSize: int
|
||||
): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]), public.} =
|
||||
# TODO: implement
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
method write*(
|
||||
self: MixEntryConnection, msg: seq[byte]
|
||||
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
|
||||
self.mixDialer(msg, self.codec, self.destination)
|
||||
|
||||
proc write*(
|
||||
self: MixEntryConnection, msg: string
|
||||
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
|
||||
self.write(msg.toBytes())
|
||||
|
||||
method writeLp*(
|
||||
self: MixEntryConnection, msg: openArray[byte]
|
||||
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
|
||||
if msg.len() > DataSize:
|
||||
let fut = newFuture[void]()
|
||||
fut.fail(
|
||||
newException(LPStreamError, "exceeds max msg size of " & $DataSize & " bytes")
|
||||
)
|
||||
return fut
|
||||
|
||||
## Write `msg` with a varint-encoded length prefix
|
||||
let vbytes = PB.toBytes(msg.len().uint64)
|
||||
var buf = newSeqUninit[byte](msg.len() + vbytes.len)
|
||||
buf[0 ..< vbytes.len] = vbytes.toOpenArray()
|
||||
buf[vbytes.len ..< buf.len] = msg
|
||||
|
||||
self.write(buf)
|
||||
|
||||
method writeLp*(
|
||||
self: MixEntryConnection, msg: string
|
||||
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
|
||||
self.writeLp(msg.toOpenArrayByte(0, msg.high))
|
||||
|
||||
proc shortLog*(self: MixEntryConnection): string {.raises: [].} =
|
||||
"[MixEntryConnection] Destination: " & $self.destination
|
||||
|
||||
method closeImpl*(
|
||||
self: MixEntryConnection
|
||||
): Future[void] {.async: (raises: [], raw: true).} =
|
||||
let fut = newFuture[void]()
|
||||
fut.complete()
|
||||
return fut
|
||||
|
||||
func hash*(self: MixEntryConnection): Hash =
|
||||
hash($self.destination)
|
||||
|
||||
when defined(libp2p_agents_metrics):
|
||||
proc setShortAgent*(self: MixEntryConnection, shortAgent: string) =
|
||||
discard
|
||||
|
||||
proc new*(
|
||||
T: typedesc[MixEntryConnection],
|
||||
srcMix: MixProtocol,
|
||||
destination: MixDestination,
|
||||
codec: string,
|
||||
): T {.raises: [].} =
|
||||
var instance = T()
|
||||
instance.destination = destination
|
||||
instance.codec = codec
|
||||
instance.mixDialer = proc(
|
||||
msg: seq[byte], codec: string, dest: MixDestination
|
||||
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
await srcMix.anonymizeLocalProtocolSend(
|
||||
nil, msg, codec, dest, 0 # TODO: set incoming queue for replies and surbs
|
||||
)
|
||||
|
||||
when defined(libp2p_agents_metrics):
|
||||
instance.shortAgent = connection.shortAgent
|
||||
|
||||
instance
|
||||
|
||||
proc toConnection*(
|
||||
srcMix: MixProtocol, destination: MixDestination, codec: string
|
||||
): Result[Connection, string] {.gcsafe, raises: [].} =
|
||||
## Create a stream to send and optionally receive responses.
|
||||
## Under the hood it will wrap the message in a sphinx packet
|
||||
## and send it via a random mix path.
|
||||
ok(MixEntryConnection.new(srcMix, destination, codec))
|
||||
36
libp2p/protocols/mix/exit_layer.nim
Normal file
36
libp2p/protocols/mix/exit_layer.nim
Normal file
@@ -0,0 +1,36 @@
|
||||
import chronicles, chronos, metrics
|
||||
import ../../builders
|
||||
import ../../stream/connection
|
||||
import ./mix_metrics
|
||||
|
||||
type ExitLayer* = object
|
||||
switch: Switch
|
||||
|
||||
proc init*(T: typedesc[ExitLayer], switch: Switch): T =
|
||||
ExitLayer(switch: switch)
|
||||
|
||||
proc onMessage*(
|
||||
self: ExitLayer,
|
||||
codec: string,
|
||||
message: seq[byte],
|
||||
destAddr: MultiAddress,
|
||||
destPeerId: PeerId,
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
# If dialing destination fails, no response is returned to
|
||||
# the sender, so, flow can just end here. Only log errors
|
||||
# for now
|
||||
# https://github.com/vacp2p/mix/issues/86
|
||||
|
||||
try:
|
||||
let destConn = await self.switch.dial(destPeerId, @[destAddr], codec)
|
||||
defer:
|
||||
await destConn.close()
|
||||
await destConn.write(message)
|
||||
except LPStreamError as exc:
|
||||
error "Stream error while writing to next hop: ", err = exc.msg
|
||||
mix_messages_error.inc(labelValues = ["ExitLayer", "LPSTREAM_ERR"])
|
||||
except DialFailedError as exc:
|
||||
error "Failed to dial next hop: ", err = exc.msg
|
||||
mix_messages_error.inc(labelValues = ["ExitLayer", "DIAL_FAILED"])
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
95
libp2p/protocols/mix/fragmentation.nim
Normal file
95
libp2p/protocols/mix/fragmentation.nim
Normal file
@@ -0,0 +1,95 @@
|
||||
import ./[serialization, seqno_generator]
|
||||
import results, stew/endians2
|
||||
import ../../peerid
|
||||
|
||||
const PaddingLengthSize* = 2
|
||||
const SeqNoSize* = 4
|
||||
const DataSize* = MessageSize - PaddingLengthSize - SeqNoSize
|
||||
|
||||
# Unpadding and reassembling messages will be handled by the top-level applications.
|
||||
# Although padding and splitting messages could also be managed at that level, we
|
||||
# implement it here to clarify the sender's logic.
|
||||
# This is crucial as the sender is responsible for wrapping messages in Sphinx packets.
|
||||
|
||||
type MessageChunk* = object
|
||||
paddingLength: uint16
|
||||
data: seq[byte]
|
||||
seqNo: uint32
|
||||
|
||||
proc init*(
|
||||
T: typedesc[MessageChunk], paddingLength: uint16, data: seq[byte], seqNo: uint32
|
||||
): T =
|
||||
T(paddingLength: paddingLength, data: data, seqNo: seqNo)
|
||||
|
||||
proc get*(msgChunk: MessageChunk): (uint16, seq[byte], uint32) =
|
||||
(msgChunk.paddingLength, msgChunk.data, msgChunk.seqNo)
|
||||
|
||||
proc serialize*(msgChunk: MessageChunk): seq[byte] =
|
||||
let
|
||||
paddingBytes = msgChunk.paddingLength.toBytesBE()
|
||||
seqNoBytes = msgChunk.seqNo.toBytesBE()
|
||||
|
||||
doAssert msgChunk.data.len == DataSize,
|
||||
"Padded data must be exactly " & $DataSize & " bytes"
|
||||
|
||||
return @paddingBytes & msgChunk.data & @seqNoBytes
|
||||
|
||||
proc deserialize*(T: typedesc[MessageChunk], data: openArray[byte]): Result[T, string] =
|
||||
if data.len != MessageSize:
|
||||
return err("Data must be exactly " & $MessageSize & " bytes")
|
||||
|
||||
let
|
||||
paddingLength = uint16.fromBytesBE(data[0 .. PaddingLengthSize - 1])
|
||||
chunk = data[PaddingLengthSize .. (PaddingLengthSize + DataSize - 1)]
|
||||
seqNo = uint32.fromBytesBE(data[PaddingLengthSize + DataSize ..^ 1])
|
||||
|
||||
ok(T(paddingLength: paddingLength, data: chunk, seqNo: seqNo))
|
||||
|
||||
proc ceilDiv*(a, b: int): int =
|
||||
(a + b - 1) div b
|
||||
|
||||
proc addPadding*(messageBytes: seq[byte], seqNo: SeqNo): MessageChunk =
|
||||
## Pads messages smaller than DataSize
|
||||
let paddingLength = uint16(DataSize - messageBytes.len)
|
||||
let paddedData =
|
||||
if paddingLength > 0:
|
||||
let paddingBytes = newSeq[byte](paddingLength)
|
||||
paddingBytes & messageBytes
|
||||
else:
|
||||
messageBytes
|
||||
MessageChunk(paddingLength: paddingLength, data: paddedData, seqNo: seqNo)
|
||||
|
||||
proc addPadding*(messageBytes: seq[byte], peerId: PeerId): MessageChunk =
|
||||
## Pads messages smaller than DataSize
|
||||
var seqNoGen = SeqNo.init(peerId)
|
||||
seqNoGen.generate(messageBytes)
|
||||
messageBytes.addPadding(seqNoGen)
|
||||
|
||||
proc removePadding*(msgChunk: MessageChunk): Result[seq[byte], string] =
|
||||
let msgLength = len(msgChunk.data) - int(msgChunk.paddingLength)
|
||||
if msgLength < 0:
|
||||
return err("Invalid padding length")
|
||||
|
||||
ok(msgChunk.data[msgChunk.paddingLength ..^ 1])
|
||||
|
||||
proc padAndChunkMessage*(messageBytes: seq[byte], peerId: PeerId): seq[MessageChunk] =
|
||||
var seqNoGen = SeqNo.init(peerId)
|
||||
seqNoGen.generate(messageBytes)
|
||||
|
||||
var chunks: seq[MessageChunk] = @[]
|
||||
|
||||
# Split to chunks
|
||||
let totalChunks = max(1, ceilDiv(messageBytes.len, DataSize))
|
||||
# Ensure at least one chunk is generated
|
||||
for i in 0 ..< totalChunks:
|
||||
let
|
||||
startIdx = i * DataSize
|
||||
endIdx = min(startIdx + DataSize, messageBytes.len)
|
||||
chunkData = messageBytes[startIdx .. endIdx - 1]
|
||||
msgChunk = chunkData.addPadding(seqNoGen)
|
||||
|
||||
chunks.add(msgChunk)
|
||||
|
||||
seqNoGen.inc()
|
||||
|
||||
return chunks
|
||||
13
libp2p/protocols/mix/mix_metrics.nim
Normal file
13
libp2p/protocols/mix/mix_metrics.nim
Normal file
@@ -0,0 +1,13 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import metrics
|
||||
|
||||
declarePublicCounter mix_messages_recvd, "number of mix messages received", ["type"]
|
||||
|
||||
declarePublicCounter mix_messages_forwarded,
|
||||
"number of mix messages forwarded", ["type"]
|
||||
|
||||
declarePublicCounter mix_messages_error,
|
||||
"number of mix messages failed processing", ["type", "error"]
|
||||
|
||||
declarePublicGauge mix_pool_size, "number of nodes in the pool"
|
||||
318
libp2p/protocols/mix/mix_node.nim
Normal file
318
libp2p/protocols/mix/mix_node.nim
Normal file
@@ -0,0 +1,318 @@
|
||||
import os, results, strformat, sugar, sequtils
|
||||
import std/streams
|
||||
import ../../crypto/[crypto, curve25519, secp]
|
||||
import ../../[multiaddress, multicodec, peerid, peerinfo]
|
||||
import ./[serialization, curve25519, multiaddr]
|
||||
|
||||
const MixNodeInfoSize* =
|
||||
AddrSize + (2 * FieldElementSize) + (SkRawPublicKeySize + SkRawPrivateKeySize)
|
||||
const MixPubInfoSize* = AddrSize + FieldElementSize + SkRawPublicKeySize
|
||||
|
||||
type MixNodeInfo* = object
|
||||
peerId*: PeerId
|
||||
multiAddr*: MultiAddress
|
||||
mixPubKey*: FieldElement
|
||||
mixPrivKey*: FieldElement
|
||||
libp2pPubKey*: SkPublicKey
|
||||
libp2pPrivKey*: SkPrivateKey
|
||||
|
||||
proc initMixNodeInfo*(
|
||||
peerId: PeerId,
|
||||
multiAddr: MultiAddress,
|
||||
mixPubKey, mixPrivKey: FieldElement,
|
||||
libp2pPubKey: SkPublicKey,
|
||||
libp2pPrivKey: SkPrivateKey,
|
||||
): MixNodeInfo =
|
||||
MixNodeInfo(
|
||||
peerId: peerId,
|
||||
multiAddr: multiAddr,
|
||||
mixPubKey: mixPubKey,
|
||||
mixPrivKey: mixPrivKey,
|
||||
libp2pPubKey: libp2pPubKey,
|
||||
libp2pPrivKey: libp2pPrivKey,
|
||||
)
|
||||
|
||||
proc get*(
|
||||
info: MixNodeInfo
|
||||
): (PeerId, MultiAddress, FieldElement, FieldElement, SkPublicKey, SkPrivateKey) =
|
||||
(
|
||||
info.peerId, info.multiAddr, info.mixPubKey, info.mixPrivKey, info.libp2pPubKey,
|
||||
info.libp2pPrivKey,
|
||||
)
|
||||
|
||||
proc serialize*(nodeInfo: MixNodeInfo): Result[seq[byte], string] =
|
||||
let addrBytes = multiAddrToBytes(nodeInfo.peerId, nodeInfo.multiAddr).valueOr:
|
||||
return err("Error in multiaddress conversion to bytes: " & error)
|
||||
|
||||
let
|
||||
mixPubKeyBytes = fieldElementToBytes(nodeInfo.mixPubKey)
|
||||
mixPrivKeyBytes = fieldElementToBytes(nodeInfo.mixPrivKey)
|
||||
libp2pPubKeyBytes = nodeInfo.libp2pPubKey.getBytes()
|
||||
libp2pPrivKeyBytes = nodeInfo.libp2pPrivKey.getBytes()
|
||||
|
||||
return ok(
|
||||
addrBytes & mixPubKeyBytes & mixPrivKeyBytes & libp2pPubKeyBytes & libp2pPrivKeyBytes
|
||||
)
|
||||
|
||||
proc deserialize*(T: typedesc[MixNodeInfo], data: openArray[byte]): Result[T, string] =
|
||||
if len(data) != MixNodeInfoSize:
|
||||
return
|
||||
err("Serialized Mix node info must be exactly " & $MixNodeInfoSize & " bytes")
|
||||
|
||||
let (peerId, multiAddr) = bytesToMultiAddr(data[0 .. AddrSize - 1]).valueOr:
|
||||
return err("Error in multiaddress conversion to bytes: " & error)
|
||||
|
||||
let mixPubKey = bytesToFieldElement(
|
||||
data[AddrSize .. (AddrSize + FieldElementSize - 1)]
|
||||
).valueOr:
|
||||
return err("Mix public key deserialize error: " & error)
|
||||
|
||||
let mixPrivKey = bytesToFieldElement(
|
||||
data[(AddrSize + FieldElementSize) .. (AddrSize + (2 * FieldElementSize) - 1)]
|
||||
).valueOr:
|
||||
return err("Mix private key deserialize error: " & error)
|
||||
|
||||
let libp2pPubKey = SkPublicKey.init(
|
||||
data[
|
||||
AddrSize + (2 * FieldElementSize) ..
|
||||
AddrSize + (2 * FieldElementSize) + SkRawPublicKeySize - 1
|
||||
]
|
||||
).valueOr:
|
||||
return err("Failed to initialize libp2p public key")
|
||||
|
||||
let libp2pPrivKey = SkPrivateKey.init(
|
||||
data[AddrSize + (2 * FieldElementSize) + SkRawPublicKeySize ..^ 1]
|
||||
).valueOr:
|
||||
return err("Failed to initialize libp2p private key")
|
||||
|
||||
ok(
|
||||
T(
|
||||
peerId: peerId,
|
||||
multiAddr: multiAddr,
|
||||
mixPubKey: mixPubKey,
|
||||
mixPrivKey: mixPrivKey,
|
||||
libp2pPubKey: libp2pPubKey,
|
||||
libp2pPrivKey: libp2pPrivKey,
|
||||
)
|
||||
)
|
||||
|
||||
proc writeToFile*(
|
||||
node: MixNodeInfo, index: int, nodeInfoFolderPath: string = "./nodeInfo"
|
||||
): Result[void, string] =
|
||||
if not dirExists(nodeInfoFolderPath):
|
||||
createDir(nodeInfoFolderPath)
|
||||
let filename = nodeInfoFolderPath / fmt"mixNode_{index}"
|
||||
var file = newFileStream(filename, fmWrite)
|
||||
if file == nil:
|
||||
return err("Failed to create file stream for " & filename)
|
||||
defer:
|
||||
file.close()
|
||||
|
||||
let serializedData = node.serialize().valueOr:
|
||||
return err("Failed to serialize mix node info: " & error)
|
||||
|
||||
file.writeData(addr serializedData[0], serializedData.len)
|
||||
return ok()
|
||||
|
||||
proc readFromFile*(
|
||||
T: typedesc[MixNodeInfo], index: int, nodeInfoFolderPath: string = "./nodeInfo"
|
||||
): Result[T, string] =
|
||||
let filename = nodeInfoFolderPath / fmt"mixNode_{index}"
|
||||
if not fileExists(filename):
|
||||
return err("File does not exist")
|
||||
var file = newFileStream(filename, fmRead)
|
||||
if file == nil:
|
||||
return err(
|
||||
"Failed to open file: " & filename &
|
||||
". Check permissions or if the path is correct."
|
||||
)
|
||||
defer:
|
||||
file.close()
|
||||
|
||||
let data = ?file.readAll().catch().mapErr(x => "File read error: " & x.msg)
|
||||
if data.len != MixNodeInfoSize:
|
||||
return err(
|
||||
"Invalid data size for MixNodeInfo: expected " & $MixNodeInfoSize &
|
||||
" bytes, but got " & $(data.len) & " bytes."
|
||||
)
|
||||
let dMixNodeInfo = MixNodeInfo.deserialize(cast[seq[byte]](data)).valueOr:
|
||||
return err("Mix node info deserialize error: " & error)
|
||||
return ok(dMixNodeInfo)
|
||||
|
||||
proc deleteNodeInfoFolder*(nodeInfoFolderPath: string = "./nodeInfo") =
|
||||
## Deletes the folder that stores serialized mix node info files
|
||||
## along with all its contents, if the folder exists.
|
||||
if dirExists(nodeInfoFolderPath):
|
||||
removeDir(nodeInfoFolderPath)
|
||||
|
||||
type MixPubInfo* = object
|
||||
peerId*: PeerId
|
||||
multiAddr*: MultiAddress
|
||||
mixPubKey*: FieldElement
|
||||
libp2pPubKey*: SkPublicKey
|
||||
|
||||
proc init*(
|
||||
T: typedesc[MixPubInfo],
|
||||
peerId: PeerId,
|
||||
multiAddr: MultiAddress,
|
||||
mixPubKey: FieldElement,
|
||||
libp2pPubKey: SkPublicKey,
|
||||
): T =
|
||||
T(
|
||||
peerId: PeerId,
|
||||
multiAddr: multiAddr,
|
||||
mixPubKey: mixPubKey,
|
||||
libp2pPubKey: libp2pPubKey,
|
||||
)
|
||||
|
||||
proc get*(info: MixPubInfo): (PeerId, MultiAddress, FieldElement, SkPublicKey) =
|
||||
(info.peerId, info.multiAddr, info.mixPubKey, info.libp2pPubKey)
|
||||
|
||||
proc serialize*(nodeInfo: MixPubInfo): Result[seq[byte], string] =
|
||||
let addrBytes = multiAddrToBytes(nodeInfo.peerId, nodeInfo.multiAddr).valueOr:
|
||||
return err("Error in multiaddress conversion to bytes: " & error)
|
||||
|
||||
let
|
||||
mixPubKeyBytes = fieldElementToBytes(nodeInfo.mixPubKey)
|
||||
libp2pPubKeyBytes = nodeInfo.libp2pPubKey.getBytes()
|
||||
|
||||
return ok(addrBytes & mixPubKeyBytes & libp2pPubKeyBytes)
|
||||
|
||||
proc deserialize*(T: typedesc[MixPubInfo], data: openArray[byte]): Result[T, string] =
|
||||
if len(data) != MixPubInfoSize:
|
||||
return
|
||||
err("Serialized mix public info must be exactly " & $MixPubInfoSize & " bytes")
|
||||
|
||||
let (peerId, multiAddr) = bytesToMultiAddr(data[0 .. AddrSize - 1]).valueOr:
|
||||
return err("Error in bytes to multiaddress conversion: " & error)
|
||||
|
||||
let mixPubKey = bytesToFieldElement(
|
||||
data[AddrSize .. (AddrSize + FieldElementSize - 1)]
|
||||
).valueOr:
|
||||
return err("Mix public key deserialize error: " & error)
|
||||
|
||||
let libp2pPubKey = SkPublicKey.init(data[(AddrSize + FieldElementSize) ..^ 1]).valueOr:
|
||||
return err("Failed to initialize libp2p public key: ")
|
||||
|
||||
ok(
|
||||
MixPubInfo(
|
||||
peerId: peerId,
|
||||
multiAddr: multiAddr,
|
||||
mixPubKey: mixPubKey,
|
||||
libp2pPubKey: libp2pPubKey,
|
||||
)
|
||||
)
|
||||
|
||||
proc writeToFile*(
|
||||
node: MixPubInfo, index: int, pubInfoFolderPath: string = "./pubInfo"
|
||||
): Result[void, string] =
|
||||
if not dirExists(pubInfoFolderPath):
|
||||
createDir(pubInfoFolderPath)
|
||||
let filename = pubInfoFolderPath / fmt"mixNode_{index}"
|
||||
var file = newFileStream(filename, fmWrite)
|
||||
if file == nil:
|
||||
return err("Failed to create file stream for " & filename)
|
||||
defer:
|
||||
file.close()
|
||||
|
||||
let serializedData = node.serialize().valueOr:
|
||||
return err("Failed to serialize mix pub info: " & error)
|
||||
|
||||
file.writeData(unsafeAddr serializedData[0], serializedData.len)
|
||||
return ok()
|
||||
|
||||
proc readFromFile*(
|
||||
T: typedesc[MixPubInfo], index: int, pubInfoFolderPath: string = "./pubInfo"
|
||||
): Result[T, string] =
|
||||
let filename = pubInfoFolderPath / fmt"mixNode_{index}"
|
||||
if not fileExists(filename):
|
||||
return err("File does not exist")
|
||||
var file = newFileStream(filename, fmRead)
|
||||
if file == nil:
|
||||
return err(
|
||||
"Failed to open file: " & filename &
|
||||
". Check permissions or if the path is correct."
|
||||
)
|
||||
defer:
|
||||
file.close()
|
||||
let data = ?file.readAll().catch().mapErr(x => "File read error: " & x.msg)
|
||||
if data.len != MixPubInfoSize:
|
||||
return err(
|
||||
"Invalid data size for MixNodeInfo: expected " & $MixNodeInfoSize &
|
||||
" bytes, but got " & $(data.len) & " bytes."
|
||||
)
|
||||
let dMixPubInfo = MixPubInfo.deserialize(cast[seq[byte]](data)).valueOr:
|
||||
return err("Mix pub info deserialize error: " & error)
|
||||
return ok(dMixPubInfo)
|
||||
|
||||
proc deletePubInfoFolder*(pubInfoFolderPath: string = "./pubInfo") =
|
||||
## Deletes the folder containing serialized public mix node info
|
||||
## and all files inside it, if the folder exists.
|
||||
if dirExists(pubInfoFolderPath):
|
||||
removeDir(pubInfoFolderPath)
|
||||
|
||||
type MixNodes* = seq[MixNodeInfo]
|
||||
|
||||
proc getMixPubInfoByIndex*(self: MixNodes, index: int): Result[MixPubInfo, string] =
|
||||
if index < 0 or index >= self.len:
|
||||
return err("Index must be between 0 and " & $(self.len))
|
||||
ok(
|
||||
MixPubInfo(
|
||||
peerId: self[index].peerId,
|
||||
multiAddr: self[index].multiAddr,
|
||||
mixPubKey: self[index].mixPubKey,
|
||||
libp2pPubKey: self[index].libp2pPubKey,
|
||||
)
|
||||
)
|
||||
|
||||
proc generateMixNodes(
|
||||
count: int, basePort: int = 4242, rng: ref HmacDrbgContext = newRng()
|
||||
): Result[MixNodes, string] =
|
||||
var nodes = newSeq[MixNodeInfo](count)
|
||||
for i in 0 ..< count:
|
||||
let keyPairResult = generateKeyPair()
|
||||
if keyPairResult.isErr:
|
||||
return err("Generate key pair error: " & $keyPairResult.error)
|
||||
let (mixPrivKey, mixPubKey) = keyPairResult.get()
|
||||
|
||||
let
|
||||
rng = newRng()
|
||||
keyPair = SkKeyPair.random(rng[])
|
||||
pubKeyProto = PublicKey(scheme: Secp256k1, skkey: keyPair.pubkey)
|
||||
peerId = PeerId.init(pubKeyProto).get()
|
||||
multiAddr =
|
||||
?MultiAddress.init(fmt"/ip4/0.0.0.0/tcp/{basePort + i}").tryGet().catch().mapErr(
|
||||
x => x.msg
|
||||
)
|
||||
|
||||
nodes[i] = MixNodeInfo(
|
||||
peerId: peerId,
|
||||
multiAddr: multiAddr,
|
||||
mixPubKey: mixPubKey,
|
||||
mixPrivKey: mixPrivKey,
|
||||
libp2pPubKey: keyPair.pubkey,
|
||||
libp2pPrivKey: keyPair.seckey,
|
||||
)
|
||||
|
||||
ok(nodes)
|
||||
|
||||
proc initializeMixNodes*(count: int, basePort: int = 4242): Result[MixNodes, string] =
|
||||
## Creates and initializes a set of mix nodes
|
||||
let mixNodes = generateMixNodes(count, basePort).valueOr:
|
||||
return err("Mix node initialization error: " & error)
|
||||
return ok(mixNodes)
|
||||
|
||||
proc findByPeerId*(self: MixNodes, peerId: PeerId): Result[MixNodeInfo, string] =
|
||||
let filteredNodes = self.filterIt(it.peerId == peerId)
|
||||
if filteredNodes.len != 0:
|
||||
return ok(filteredNodes[0])
|
||||
return err("No node with peer id: " & $peerId)
|
||||
|
||||
proc initMixMultiAddrByIndex*(
|
||||
self: var MixNodes, index: int, peerId: PeerId, multiAddr: MultiAddress
|
||||
): Result[void, string] =
|
||||
if index < 0 or index >= self.len:
|
||||
return err("Index must be between 0 and " & $(self.len))
|
||||
self[index].multiAddr = multiAddr
|
||||
self[index].peerId = peerId
|
||||
ok()
|
||||
392
libp2p/protocols/mix/mix_protocol.nim
Normal file
392
libp2p/protocols/mix/mix_protocol.nim
Normal file
@@ -0,0 +1,392 @@
|
||||
import chronicles, chronos, sequtils, strutils, os, results
|
||||
import std/[strformat, tables], metrics
|
||||
import
|
||||
./[
|
||||
curve25519, fragmentation, mix_message, mix_node, sphinx, serialization,
|
||||
tag_manager, mix_metrics, exit_layer, multiaddr,
|
||||
]
|
||||
import stew/endians2
|
||||
import ../protocol
|
||||
import ../../stream/[connection, lpstream]
|
||||
import ../../[switch, multicodec, peerinfo]
|
||||
|
||||
const MixProtocolID* = "/mix/1.0.0"
|
||||
|
||||
## Mix Protocol defines a decentralized anonymous message routing layer for libp2p networks.
|
||||
## It enables sender anonymity by routing each message through a decentralized mix overlay
|
||||
## network composed of participating libp2p nodes, known as mix nodes. Each message is
|
||||
## routed independently in a stateless manner, allowing other libp2p protocols to selectively
|
||||
## anonymize messages without modifying their core protocol behavior.
|
||||
type MixProtocol* = ref object of LPProtocol
|
||||
mixNodeInfo: MixNodeInfo
|
||||
pubNodeInfo: Table[PeerId, MixPubInfo]
|
||||
switch: Switch
|
||||
tagManager: TagManager
|
||||
exitLayer: ExitLayer
|
||||
rng: ref HmacDrbgContext
|
||||
|
||||
proc loadAllButIndexMixPubInfo*(
|
||||
index, numNodes: int, pubInfoFolderPath: string = "./pubInfo"
|
||||
): Result[Table[PeerId, MixPubInfo], string] =
|
||||
var pubInfoTable = initTable[PeerId, MixPubInfo]()
|
||||
for i in 0 ..< numNodes:
|
||||
if i == index:
|
||||
continue
|
||||
let pubInfo = MixPubInfo.readFromFile(i, pubInfoFolderPath).valueOr:
|
||||
return err("Failed to load pub info from file: " & error)
|
||||
pubInfoTable[pubInfo.peerId] = pubInfo
|
||||
return ok(pubInfoTable)
|
||||
|
||||
proc cryptoRandomInt(rng: ref HmacDrbgContext, max: int): Result[int, string] =
|
||||
if max == 0:
|
||||
return err("Max cannot be zero.")
|
||||
let res = rng[].generate(uint64) mod uint64(max)
|
||||
ok(res.int)
|
||||
|
||||
proc handleMixNodeConnection(
|
||||
mixProto: MixProtocol, conn: Connection
|
||||
) {.async: (raises: [LPStreamError, CancelledError]).} =
|
||||
let receivedBytes =
|
||||
try:
|
||||
await conn.readLp(PacketSize)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
if receivedBytes.len == 0:
|
||||
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "NO_DATA"])
|
||||
return # No data, end of stream
|
||||
|
||||
# Process the packet
|
||||
let (peerId, multiAddr, _, mixPrivKey, _, _) = mixProto.mixNodeInfo.get()
|
||||
|
||||
let sphinxPacket = SphinxPacket.deserialize(receivedBytes).valueOr:
|
||||
error "Sphinx packet deserialization error", err = error
|
||||
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "INVALID_SPHINX"])
|
||||
return
|
||||
|
||||
let processedSP = processSphinxPacket(sphinxPacket, mixPrivKey, mixProto.tagManager).valueOr:
|
||||
error "Failed to process Sphinx packet", err = error
|
||||
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "INVALID_SPHINX"])
|
||||
return
|
||||
|
||||
case processedSP.status
|
||||
of Exit:
|
||||
mix_messages_recvd.inc(labelValues = ["Exit"])
|
||||
|
||||
# This is the exit node, forward to destination
|
||||
let msgChunk = MessageChunk.deserialize(processedSP.messageChunk).valueOr:
|
||||
error "Deserialization failed", err = error
|
||||
mix_messages_error.inc(labelValues = ["Exit", "INVALID_SPHINX"])
|
||||
return
|
||||
|
||||
let unpaddedMsg = msgChunk.removePadding().valueOr:
|
||||
error "Unpadding message failed", err = error
|
||||
mix_messages_error.inc(labelValues = ["Exit", "INVALID_SPHINX"])
|
||||
return
|
||||
|
||||
let deserialized = MixMessage.deserialize(unpaddedMsg).valueOr:
|
||||
error "Deserialization failed", err = error
|
||||
mix_messages_error.inc(labelValues = ["Exit", "INVALID_SPHINX"])
|
||||
return
|
||||
|
||||
if processedSP.destination == Hop():
|
||||
error "no destination available"
|
||||
mix_messages_error.inc(labelValues = ["Exit", "NO_DESTINATION"])
|
||||
return
|
||||
|
||||
let destBytes = processedSP.destination.get()
|
||||
|
||||
let (destPeerId, destAddr) = bytesToMultiAddr(destBytes).valueOr:
|
||||
error "Failed to convert bytes to multiaddress", err = error
|
||||
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
|
||||
return
|
||||
|
||||
trace "Exit node - Received mix message",
|
||||
peerId,
|
||||
message = deserialized.message,
|
||||
codec = deserialized.codec,
|
||||
to = destPeerId
|
||||
|
||||
await mixProto.exitLayer.onMessage(
|
||||
deserialized.codec, deserialized.message, destAddr, destPeerId
|
||||
)
|
||||
|
||||
mix_messages_forwarded.inc(labelValues = ["Exit"])
|
||||
of Reply:
|
||||
# TODO: implement
|
||||
discard
|
||||
of Intermediate:
|
||||
trace "# Intermediate: ", peerId, multiAddr
|
||||
# Add delay
|
||||
mix_messages_recvd.inc(labelValues = ["Intermediate"])
|
||||
await sleepAsync(milliseconds(processedSP.delayMs))
|
||||
|
||||
# Forward to next hop
|
||||
let nextHopBytes = processedSP.nextHop.get()
|
||||
|
||||
let (nextPeerId, nextAddr) = bytesToMultiAddr(nextHopBytes).valueOr:
|
||||
error "Failed to convert bytes to multiaddress", err = error
|
||||
mix_messages_error.inc(labelValues = ["Intermediate", "INVALID_DEST"])
|
||||
return
|
||||
|
||||
try:
|
||||
let nextHopConn =
|
||||
await mixProto.switch.dial(nextPeerId, @[nextAddr], MixProtocolID)
|
||||
defer:
|
||||
await nextHopConn.close()
|
||||
|
||||
await nextHopConn.writeLp(processedSP.serializedSphinxPacket)
|
||||
mix_messages_forwarded.inc(labelValues = ["Intermediate"])
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except DialFailedError as exc:
|
||||
error "Failed to dial next hop: ", err = exc.msg
|
||||
mix_messages_error.inc(labelValues = ["Intermediate", "DIAL_FAILED"])
|
||||
except LPStreamError as exc:
|
||||
error "Failed to write to next hop: ", err = exc.msg
|
||||
mix_messages_error.inc(labelValues = ["Intermediate", "DIAL_FAILED"])
|
||||
of Duplicate:
|
||||
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "DUPLICATE"])
|
||||
of InvalidMAC:
|
||||
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "INVALID_MAC"])
|
||||
|
||||
proc getMaxMessageSizeForCodec*(
|
||||
codec: string, numberOfSurbs: uint8 = 0
|
||||
): Result[int, string] =
|
||||
## Computes the maximum payload size (in bytes) available for a message when encoded
|
||||
## with the given `codec`
|
||||
## Returns an error if the codec length would cause it to exceeds the data capacity.
|
||||
let serializedMsg = MixMessage.init(@[], codec).serialize()
|
||||
if serializedMsg.len > DataSize:
|
||||
return err("cannot encode messages for this codec")
|
||||
return ok(DataSize - serializedMsg.len)
|
||||
|
||||
proc sendPacket(
|
||||
mixProto: MixProtocol,
|
||||
multiAddrs: MultiAddress,
|
||||
sphinxPacket: seq[byte],
|
||||
label: string,
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
## Send the wrapped message to the first mix node in the selected path
|
||||
|
||||
let (firstMixPeerId, firstMixAddr) = parseFullAddress(multiAddrs).valueOr:
|
||||
error "Invalid multiaddress", err = error
|
||||
mix_messages_error.inc(labelValues = [label, "NON_RECOVERABLE"])
|
||||
return
|
||||
|
||||
try:
|
||||
let nextHopConn =
|
||||
await mixProto.switch.dial(firstMixPeerId, @[firstMixAddr], @[MixProtocolID])
|
||||
defer:
|
||||
await nextHopConn.close()
|
||||
await nextHopConn.writeLp(sphinxPacket)
|
||||
except DialFailedError as exc:
|
||||
error "Failed to dial next hop: ",
|
||||
peerId = firstMixPeerId, address = firstMixAddr, err = exc.msg
|
||||
mix_messages_error.inc(labelValues = [label, "SEND_FAILED"])
|
||||
except LPStreamError as exc:
|
||||
error "Failed to write to next hop: ",
|
||||
peerId = firstMixPeerId, address = firstMixAddr, err = exc.msg
|
||||
mix_messages_error.inc(labelValues = [label, "SEND_FAILED"])
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
|
||||
mix_messages_forwarded.inc(labelValues = ["Entry"])
|
||||
|
||||
proc buildMessage(
|
||||
msg: seq[byte], codec: string, peerId: PeerId
|
||||
): Result[Message, (string, string)] =
|
||||
let
|
||||
mixMsg = MixMessage.init(msg, codec)
|
||||
serialized = mixMsg.serialize()
|
||||
|
||||
if serialized.len > DataSize:
|
||||
return err(("message size exceeds maximum payload size", "INVALID_SIZE"))
|
||||
|
||||
let
|
||||
paddedMsg = addPadding(serialized, peerId)
|
||||
serializedMsgChunk = paddedMsg.serialize()
|
||||
|
||||
ok(serializedMsgChunk)
|
||||
|
||||
## Represents the final target of a mixnet message.
|
||||
## contains the peer id and multiaddress of the destination node.
|
||||
type MixDestination* = object
|
||||
peerId: PeerId
|
||||
address: MultiAddress
|
||||
|
||||
proc init*(T: typedesc[MixDestination], peerId: PeerId, address: MultiAddress): T =
|
||||
## Initializes a destination object with the given peer id and multiaddress.
|
||||
T(peerId: peerId, address: address)
|
||||
|
||||
proc `$`*(d: MixDestination): string =
|
||||
$d.address & "/p2p/" & $d.peerId
|
||||
|
||||
proc anonymizeLocalProtocolSend*(
|
||||
mixProto: MixProtocol,
|
||||
incoming: AsyncQueue[seq[byte]],
|
||||
msg: seq[byte],
|
||||
codec: string,
|
||||
destination: MixDestination,
|
||||
numSurbs: uint8,
|
||||
) {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
mix_messages_recvd.inc(labelValues = ["Entry"])
|
||||
|
||||
var
|
||||
multiAddrs: seq[MultiAddress] = @[]
|
||||
publicKeys: seq[FieldElement] = @[]
|
||||
hop: seq[Hop] = @[]
|
||||
delay: seq[seq[byte]] = @[]
|
||||
exitPeerId: PeerId
|
||||
|
||||
# Select L mix nodes at random
|
||||
let numMixNodes = mixProto.pubNodeInfo.len
|
||||
var numAvailableNodes = numMixNodes
|
||||
|
||||
debug "Destination data", destination
|
||||
|
||||
if mixProto.pubNodeInfo.hasKey(destination.peerId):
|
||||
numAvailableNodes = numMixNodes - 1
|
||||
|
||||
if numAvailableNodes < PathLength:
|
||||
error "No. of public mix nodes less than path length.",
|
||||
numMixNodes = numAvailableNodes, pathLength = PathLength
|
||||
mix_messages_error.inc(labelValues = ["Entry", "LOW_MIX_POOL"])
|
||||
return
|
||||
|
||||
# Skip the destination peer
|
||||
var pubNodeInfoKeys =
|
||||
mixProto.pubNodeInfo.keys.toSeq().filterIt(it != destination.peerId)
|
||||
var availableIndices = toSeq(0 ..< pubNodeInfoKeys.len)
|
||||
|
||||
var i = 0
|
||||
while i < PathLength:
|
||||
let randomIndexPosition = cryptoRandomInt(mixProto.rng, availableIndices.len).valueOr:
|
||||
error "Failed to generate random number", err = error
|
||||
mix_messages_error.inc(labelValues = ["Entry", "NON_RECOVERABLE"])
|
||||
return
|
||||
let selectedIndex = availableIndices[randomIndexPosition]
|
||||
let randPeerId = pubNodeInfoKeys[selectedIndex]
|
||||
availableIndices.del(randomIndexPosition)
|
||||
|
||||
# Last hop will be the exit node that will forward the request
|
||||
if i == PathLength - 1:
|
||||
exitPeerId = randPeerId
|
||||
|
||||
debug "Selected mix node: ", indexInPath = i, peerId = randPeerId
|
||||
|
||||
# Extract multiaddress, mix public key, and hop
|
||||
let (peerId, multiAddr, mixPubKey, _) =
|
||||
mixProto.pubNodeInfo.getOrDefault(randPeerId).get()
|
||||
multiAddrs.add(multiAddr)
|
||||
publicKeys.add(mixPubKey)
|
||||
|
||||
let multiAddrBytes = multiAddrToBytes(peerId, multiAddr).valueOr:
|
||||
error "Failed to convert multiaddress to bytes", err = error
|
||||
mix_messages_error.inc(labelValues = ["Entry", "INVALID_MIX_INFO"])
|
||||
#TODO: should we skip and pick a different node here??
|
||||
return
|
||||
|
||||
hop.add(Hop.init(multiAddrBytes))
|
||||
|
||||
# Compute delay
|
||||
let delayMillisec =
|
||||
if i != PathLength - 1:
|
||||
cryptoRandomInt(mixProto.rng, 3).valueOr:
|
||||
error "Failed to generate random number", err = error
|
||||
mix_messages_error.inc(labelValues = ["Entry", "NON_RECOVERABLE"])
|
||||
return
|
||||
else:
|
||||
0 # Last hop does not require a delay
|
||||
|
||||
delay.add(@(delayMillisec.uint16.toBytesBE()))
|
||||
|
||||
i = i + 1
|
||||
|
||||
#Encode destination
|
||||
let destAddrBytes = multiAddrToBytes(destination.peerId, destination.address).valueOr:
|
||||
error "Failed to convert multiaddress to bytes", err = error
|
||||
mix_messages_error.inc(labelValues = ["Entry", "INVALID_DEST"])
|
||||
return
|
||||
let destHop = Hop.init(destAddrBytes)
|
||||
let message = buildMessage(msg, codec, mixProto.mixNodeInfo.peerId).valueOr:
|
||||
error "Error building message", err = error[0]
|
||||
mix_messages_error.inc(labelValues = ["Entry", error[1]])
|
||||
return
|
||||
|
||||
# Wrap in Sphinx packet
|
||||
let sphinxPacket = wrapInSphinxPacket(message, publicKeys, delay, hop, destHop).valueOr:
|
||||
error "Failed to wrap in sphinx packet", err = error
|
||||
mix_messages_error.inc(labelValues = ["Entry", "NON_RECOVERABLE"])
|
||||
return
|
||||
|
||||
# Send the wrapped message to the first mix node in the selected path
|
||||
await mixProto.sendPacket(multiAddrs[0], sphinxPacket, "Entry")
|
||||
|
||||
proc init*(
|
||||
mixProto: MixProtocol,
|
||||
mixNodeInfo: MixNodeInfo,
|
||||
pubNodeInfo: Table[PeerId, MixPubInfo],
|
||||
switch: Switch,
|
||||
tagManager: TagManager = TagManager.new(),
|
||||
rng: ref HmacDrbgContext = newRng(),
|
||||
) =
|
||||
mixProto.mixNodeInfo = mixNodeInfo
|
||||
mixProto.pubNodeInfo = pubNodeInfo
|
||||
mixProto.switch = switch
|
||||
mixProto.tagManager = tagManager
|
||||
|
||||
mixProto.exitLayer = ExitLayer.init(switch)
|
||||
mixProto.codecs = @[MixProtocolID]
|
||||
mixProto.rng = rng
|
||||
mixProto.handler = proc(
|
||||
conn: Connection, proto: string
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
await mixProto.handleMixNodeConnection(conn)
|
||||
except LPStreamError as e:
|
||||
debug "Stream error", conn = conn, err = e.msg
|
||||
|
||||
proc new*(
|
||||
T: typedesc[MixProtocol],
|
||||
mixNodeInfo: MixNodeInfo,
|
||||
pubNodeInfo: Table[PeerId, MixPubInfo],
|
||||
switch: Switch,
|
||||
tagManager: TagManager = TagManager.new(),
|
||||
rng: ref HmacDrbgContext = newRng(),
|
||||
): T =
|
||||
let mixProto = new(T)
|
||||
mixProto.init(mixNodeInfo, pubNodeInfo, switch)
|
||||
mixProto
|
||||
|
||||
proc new*(
|
||||
T: typedesc[MixProtocol],
|
||||
index, numNodes: int,
|
||||
switch: Switch,
|
||||
nodeFolderInfoPath: string = ".",
|
||||
rng: ref HmacDrbgContext = newRng(),
|
||||
): Result[T, string] =
|
||||
## Constructs a new `MixProtocol` instance for the mix node at `index`,
|
||||
## loading its private info from `nodeInfo` and the public info of all other nodes from `pubInfo`.
|
||||
let mixNodeInfo = MixNodeInfo.readFromFile(index, nodeFolderInfoPath / fmt"nodeInfo").valueOr:
|
||||
return err("Failed to load mix node info for index " & $index & " - err: " & error)
|
||||
|
||||
let pubNodeInfo = loadAllButIndexMixPubInfo(
|
||||
index, numNodes, nodeFolderInfoPath / fmt"pubInfo"
|
||||
).valueOr:
|
||||
return err("Failed to load mix pub info for index " & $index & " - err: " & error)
|
||||
|
||||
let mixProto =
|
||||
MixProtocol.new(mixNodeInfo, pubNodeInfo, switch, TagManager.new(), rng)
|
||||
|
||||
return ok(mixProto)
|
||||
|
||||
proc setNodePool*(
|
||||
mixProtocol: MixProtocol, mixNodeTable: Table[PeerId, MixPubInfo]
|
||||
) {.gcsafe, raises: [].} =
|
||||
mixProtocol.pubNodeInfo = mixNodeTable
|
||||
|
||||
proc getNodePoolSize*(mixProtocol: MixProtocol): int {.gcsafe, raises: [].} =
|
||||
mixProtocol.pubNodeInfo.len
|
||||
@@ -1,17 +1,17 @@
|
||||
import results, sugar, sequtils, strutils
|
||||
import ./serialization
|
||||
import stew/[base58, endians2]
|
||||
import stew/endians2
|
||||
import ../../[multicodec, multiaddress, peerid]
|
||||
|
||||
const
|
||||
PeerIdByteLen = 39 # ed25519 and secp256k1 multihash length
|
||||
MinMultiAddrComponentLen = 3
|
||||
MaxMultiAddrComponentLen = 6 # quic + circuit relay
|
||||
MinMultiAddrComponentLen = 2
|
||||
MaxMultiAddrComponentLen = 5 # quic + circuit relay
|
||||
|
||||
# TODO: Add support for ipv6, dns, dns4, ws/wss/sni support
|
||||
|
||||
proc multiAddrToBytes*(
|
||||
multiAddr: MultiAddress
|
||||
peerId: PeerId, multiAddr: MultiAddress
|
||||
): Result[seq[byte], string] {.raises: [].} =
|
||||
var ma = multiAddr
|
||||
let sma = multiAddr.items().toSeq()
|
||||
@@ -22,7 +22,7 @@ proc multiAddrToBytes*(
|
||||
|
||||
# Only IPV4 is supported
|
||||
let isCircuitRelay = ?ma.contains(multiCodec("p2p-circuit"))
|
||||
let baseP2PEndIdx = if isCircuitRelay: 4 else: 2
|
||||
let baseP2PEndIdx = if isCircuitRelay: 3 else: 1
|
||||
let baseAddr =
|
||||
try:
|
||||
if sma.len - 1 - baseP2PEndIdx < 0:
|
||||
@@ -53,27 +53,22 @@ proc multiAddrToBytes*(
|
||||
let portNum = ?catch(port.split('/')[2].parseInt()).mapErr(x => x.msg)
|
||||
res.add(portNum.uint16.toBytesBE())
|
||||
|
||||
# PeerID (39 bytes), if using circuit relay, this represents the relay server
|
||||
let p2pPart = ?ma.getPart(multiCodec("p2p"))
|
||||
let peerId = ?PeerId.init(?p2pPart.protoArgument()).mapErr(x => $x)
|
||||
|
||||
if peerId.data.len != PeerIdByteLen:
|
||||
return err("unsupported PeerId key type")
|
||||
res.add(peerId.data)
|
||||
|
||||
if isCircuitRelay:
|
||||
let dstPart = ?sma[^1]
|
||||
let dstPeerId = ?PeerId.init(?dstPart.protoArgument()).mapErr(x => $x)
|
||||
if dstPeerId.data.len != PeerIdByteLen:
|
||||
let relayIdPart = ?ma.getPart(multiCodec("p2p"))
|
||||
let relayId = ?PeerId.init(?relayIdPart.protoArgument()).mapErr(x => $x)
|
||||
if relayId.data.len != PeerIdByteLen:
|
||||
return err("unsupported PeerId key type")
|
||||
res.add(dstPeerId.data)
|
||||
res.add(relayId.data)
|
||||
|
||||
# PeerID (39 bytes)
|
||||
res.add(peerId.data)
|
||||
|
||||
if res.len > AddrSize:
|
||||
return err("Address must be <= " & $AddrSize & " bytes")
|
||||
|
||||
return ok(res & newSeq[byte](AddrSize - res.len))
|
||||
|
||||
proc bytesToMultiAddr*(bytes: openArray[byte]): Result[MultiAddress, string] =
|
||||
proc bytesToMultiAddr*(bytes: openArray[byte]): MaResult[(PeerId, MultiAddress)] =
|
||||
if bytes.len != AddrSize:
|
||||
return err("Address must be exactly " & $AddrSize & " bytes")
|
||||
|
||||
@@ -83,15 +78,18 @@ proc bytesToMultiAddr*(bytes: openArray[byte]): Result[MultiAddress, string] =
|
||||
quic = if bytes[4] == 1: "/quic-v1" else: ""
|
||||
port = uint16.fromBytesBE(bytes[5 .. 6])
|
||||
# peerId1 represents the circuit relay server addr if p2p-circuit addr, otherwise it's the node's actual peerId
|
||||
peerId1 = "/p2p/" & Base58.encode(bytes[7 ..< 46])
|
||||
peerId1Bytes = bytes[7 ..< 46]
|
||||
peerId2Bytes = bytes[7 + PeerIdByteLen ..< 7 + (PeerIdByteLen * 2)]
|
||||
# peerId2 will contain a value only if this is a p2p-circuit address
|
||||
peerId2 =
|
||||
if peerId2Bytes != newSeq[byte](PeerIdByteLen):
|
||||
"/p2p-circuit/p2p/" & Base58.encode(peerId2Bytes)
|
||||
else:
|
||||
""
|
||||
|
||||
return MultiAddress
|
||||
.init("/ip4/" & ip & "/" & protocol & "/" & $port & quic & peerId1 & peerId2)
|
||||
.mapErr(x => $x)
|
||||
let ma = ?MultiAddress.init("/ip4/" & ip & "/" & protocol & "/" & $port & quic)
|
||||
|
||||
return
|
||||
if peerId2Bytes != newSeq[byte](PeerIdByteLen):
|
||||
# Has circuit relay address
|
||||
let relayIdMa = ?MultiAddress.init(multiCodec("p2p"), peerId1Bytes)
|
||||
let p2pCircuitMa = ?MultiAddress.init(multiCodec("p2p-circuit"))
|
||||
let peerId = ?PeerId.init(peerId2Bytes).mapErr(x => $x)
|
||||
ok((peerId, ?(ma & relayIdMa & p2pCircuitMa).catch().mapErr(x => x.msg)))
|
||||
else:
|
||||
let peerId = ?PeerId.init(peerId1Bytes).mapErr(x => $x)
|
||||
ok((peerId, ma))
|
||||
|
||||
@@ -6,7 +6,6 @@ const
|
||||
k* = 16 # Security parameter
|
||||
r* = 5 # Maximum path length
|
||||
t* = 6 # t.k - combined length of next hop address and delay
|
||||
L* = 3 # Path length
|
||||
AlphaSize* = 32 # Group element
|
||||
BetaSize* = ((r * (t + 1)) + 1) * k # bytes
|
||||
GammaSize* = 16 # Output of HMAC-SHA-256, truncated to 16 bytes
|
||||
@@ -15,7 +14,7 @@ const
|
||||
AddrSize* = (t * k) - DelaySize # Address size
|
||||
PacketSize* = 4608 # Total packet size (from spec)
|
||||
MessageSize* = PacketSize - HeaderSize - k # Size of the message itself
|
||||
payloadSize* = MessageSize + k # Total payload size
|
||||
PayloadSize* = MessageSize + k # Total payload size
|
||||
SurbSize* = HeaderSize + k + AddrSize
|
||||
# Size of a surb packet inside the message payload
|
||||
SurbLenSize* = 1 # Size of the field storing the number of surbs
|
||||
@@ -70,8 +69,8 @@ proc serialize*(message: Message): seq[byte] =
|
||||
proc deserialize*(
|
||||
T: typedesc[Message], serializedMessage: openArray[byte]
|
||||
): Result[T, string] =
|
||||
if len(serializedMessage) != payloadSize:
|
||||
return err("Serialized message must be exactly " & $payloadSize & " bytes")
|
||||
if len(serializedMessage) != PayloadSize:
|
||||
return err("Serialized message must be exactly " & $PayloadSize & " bytes")
|
||||
return ok(serializedMessage[k ..^ 1])
|
||||
|
||||
type Hop* = object
|
||||
@@ -139,6 +138,23 @@ proc serialize*(info: RoutingInfo): seq[byte] =
|
||||
|
||||
return addrBytes & info.Delay & info.Gamma & info.Beta
|
||||
|
||||
proc readBytes(
|
||||
data: openArray[byte], offset: var int, readSize: Opt[int] = Opt.none(int)
|
||||
): Result[seq[byte], string] =
|
||||
if data.len < offset:
|
||||
return err("not enough data")
|
||||
|
||||
readSize.withValue(size):
|
||||
if data.len < offset + size:
|
||||
return err("not enough data")
|
||||
let slice = data[offset ..< offset + size]
|
||||
offset += size
|
||||
return ok(slice)
|
||||
|
||||
let slice = data[offset .. ^1]
|
||||
offset = data.len
|
||||
return ok(slice)
|
||||
|
||||
proc deserialize*(T: typedesc[RoutingInfo], data: openArray[byte]): Result[T, string] =
|
||||
if len(data) != BetaSize + ((t + 1) * k):
|
||||
return err("Data must be exactly " & $(BetaSize + ((t + 1) * k)) & " bytes")
|
||||
@@ -146,13 +162,13 @@ proc deserialize*(T: typedesc[RoutingInfo], data: openArray[byte]): Result[T, st
|
||||
let hop = Hop.deserialize(data[0 .. AddrSize - 1]).valueOr:
|
||||
return err("Deserialize hop error: " & error)
|
||||
|
||||
var offset: int = AddrSize
|
||||
return ok(
|
||||
RoutingInfo(
|
||||
Addr: hop,
|
||||
Delay: data[AddrSize .. (AddrSize + DelaySize - 1)],
|
||||
Gamma: data[(AddrSize + DelaySize) .. (AddrSize + DelaySize + GammaSize - 1)],
|
||||
Beta:
|
||||
data[(AddrSize + DelaySize + GammaSize) .. (((r * (t + 1)) + t + 2) * k) - 1],
|
||||
Delay: ?data.readBytes(offset, Opt.some(DelaySize)),
|
||||
Gamma: ?data.readBytes(offset, Opt.some(GammaSize)),
|
||||
Beta: ?data.readBytes(offset, Opt.some(BetaSize)),
|
||||
)
|
||||
)
|
||||
|
||||
@@ -183,7 +199,7 @@ type
|
||||
|
||||
Key* = seq[byte]
|
||||
|
||||
I* = array[SurbIdLen, byte]
|
||||
SURBIdentifier* = array[SurbIdLen, byte]
|
||||
|
||||
SURB* = object
|
||||
hop*: Hop
|
||||
@@ -201,23 +217,6 @@ proc serializeMessageWithSURBs*(
|
||||
surbs.mapIt(it.hop.serialize() & it.header.serialize() & it.key).concat()
|
||||
ok(byte(surbs.len) & surbBytes & msg)
|
||||
|
||||
proc readBytes(
|
||||
data: seq[byte], offset: var int, readSize: Opt[int] = Opt.none(int)
|
||||
): Result[seq[byte], string] =
|
||||
if data.len < offset:
|
||||
return err("not enough data")
|
||||
|
||||
readSize.withValue(size):
|
||||
if data.len < offset + size:
|
||||
return err("not enough data")
|
||||
let slice = data[offset ..< offset + size]
|
||||
offset += size
|
||||
return ok(slice)
|
||||
|
||||
let slice = data[offset .. ^1]
|
||||
offset = data.len
|
||||
return ok(slice)
|
||||
|
||||
proc extractSURBs*(msg: seq[byte]): Result[(seq[SURB], seq[byte]), string] =
|
||||
var offset = 0
|
||||
let surbsLenBytes = ?readBytes(msg, offset, Opt.some(1))
|
||||
|
||||
@@ -1,11 +1,17 @@
|
||||
import results, sequtils, stew/endians2
|
||||
import ./[crypto, curve25519, serialization, tag_manager]
|
||||
import ../../crypto/crypto
|
||||
import ../../utils/sequninit
|
||||
|
||||
const PathLength* = 3 # Path length (L)
|
||||
const PaddingLength = (((t + 1) * (r - PathLength)) + 1) * k
|
||||
|
||||
type ProcessingStatus* = enum
|
||||
Exit # Packet processed successfully at exit
|
||||
Intermediate # Packet processed successfully at intermediate node
|
||||
Duplicate # Packet was discarded due to duplicate tag
|
||||
InvalidMAC # Packet was discarded due to MAC verification failure
|
||||
Exit
|
||||
Intermediate
|
||||
Reply
|
||||
Duplicate
|
||||
InvalidMAC
|
||||
|
||||
proc computeAlpha(
|
||||
publicKeys: openArray[FieldElement]
|
||||
@@ -79,16 +85,12 @@ proc computeFillerStrings(s: seq[seq[byte]]): Result[seq[byte], string] =
|
||||
|
||||
return ok(filler)
|
||||
|
||||
const paddingLength = (((t + 1) * (r - L)) + 1) * k
|
||||
|
||||
# Function to compute:
|
||||
|
||||
proc computeBetaGamma(
|
||||
s: seq[seq[byte]],
|
||||
hop: openArray[Hop],
|
||||
delay: openArray[seq[byte]],
|
||||
destHop: Hop,
|
||||
id: I,
|
||||
id: SURBIdentifier,
|
||||
): Result[tuple[beta: seq[byte], gamma: seq[byte]], string] =
|
||||
## Calculates the following elements:
|
||||
## - Beta: The nested encrypted routing information. It encodes the next hop address, the forwarding delay, integrity check Gamma for the next hop, and the Beta for subsequent hops.
|
||||
@@ -112,7 +114,7 @@ proc computeBetaGamma(
|
||||
# Compute Beta and Gamma
|
||||
if i == sLen - 1:
|
||||
let destBytes = destHop.serialize()
|
||||
let destPadding = destBytes & delay[i] & @id & newSeq[byte](paddingLength)
|
||||
let destPadding = destBytes & delay[i] & @id & newSeq[byte](PaddingLength)
|
||||
|
||||
let aes = aes_ctr(beta_aes_key, beta_iv, destPadding)
|
||||
|
||||
@@ -149,6 +151,70 @@ proc computeDelta(s: seq[seq[byte]], msg: Message): Result[seq[byte], string] =
|
||||
|
||||
return ok(delta)
|
||||
|
||||
proc createSURB*(
|
||||
publicKeys: openArray[FieldElement],
|
||||
delay: openArray[seq[byte]],
|
||||
hops: openArray[Hop],
|
||||
id: SURBIdentifier,
|
||||
rng: ref HmacDrbgContext = newRng(),
|
||||
): Result[SURB, string] =
|
||||
if id == default(SURBIdentifier):
|
||||
return err("id should be initialized")
|
||||
|
||||
# Compute alpha and shared secrets
|
||||
let (alpha_0, s) = computeAlpha(publicKeys).valueOr:
|
||||
return err("Error in alpha generation: " & error)
|
||||
|
||||
# Compute beta and gamma
|
||||
let (beta_0, gamma_0) = computeBetaGamma(s, hops, delay, Hop(), id).valueOr:
|
||||
return err("Error in beta and gamma generation: " & error)
|
||||
|
||||
# Generate key
|
||||
var key = newSeqUninit[byte](k)
|
||||
rng[].generate(key)
|
||||
|
||||
return ok(
|
||||
SURB(
|
||||
hop: hops[0],
|
||||
header: Header.init(alpha_0, beta_0, gamma_0),
|
||||
secret: Opt.some(s),
|
||||
key: key,
|
||||
)
|
||||
)
|
||||
|
||||
proc useSURB*(surb: SURB, msg: Message): SphinxPacket =
|
||||
# Derive AES key and IV
|
||||
let
|
||||
delta_aes_key = deriveKeyMaterial("delta_aes_key", surb.key).kdf()
|
||||
delta_iv = deriveKeyMaterial("delta_iv", surb.key).kdf()
|
||||
|
||||
# Compute Delta
|
||||
let serializedMsg = msg.serialize()
|
||||
let delta = aes_ctr(delta_aes_key, delta_iv, serializedMsg)
|
||||
|
||||
return SphinxPacket.init(surb.header, delta)
|
||||
|
||||
proc processReply*(
|
||||
key: seq[byte], s: seq[seq[byte]], delta_prime: seq[byte]
|
||||
): Result[seq[byte], string] =
|
||||
var delta = delta_prime[0 ..^ 1]
|
||||
|
||||
var key_prime = key
|
||||
for i in 0 .. s.len:
|
||||
if i != 0:
|
||||
key_prime = s[i - 1]
|
||||
|
||||
let
|
||||
delta_aes_key = deriveKeyMaterial("delta_aes_key", key_prime).kdf()
|
||||
delta_iv = deriveKeyMaterial("delta_iv", key_prime).kdf()
|
||||
|
||||
delta = aes_ctr(delta_aes_key, delta_iv, delta)
|
||||
|
||||
let deserializeMsg = Message.deserialize(delta).valueOr:
|
||||
return err("Message deserialization error: " & error)
|
||||
|
||||
return ok(deserializeMsg)
|
||||
|
||||
proc wrapInSphinxPacket*(
|
||||
msg: Message,
|
||||
publicKeys: openArray[FieldElement],
|
||||
@@ -161,7 +227,9 @@ proc wrapInSphinxPacket*(
|
||||
return err("Error in alpha generation: " & error)
|
||||
|
||||
# Compute beta and gamma
|
||||
let (beta_0, gamma_0) = computeBetaGamma(s, hop, delay, destHop, default(I)).valueOr:
|
||||
let (beta_0, gamma_0) = computeBetaGamma(
|
||||
s, hop, delay, destHop, default(SURBIdentifier)
|
||||
).valueOr:
|
||||
return err("Error in beta and gamma generation: " & error)
|
||||
|
||||
# Compute delta
|
||||
@@ -184,9 +252,27 @@ type ProcessedSphinxPacket* = object
|
||||
nextHop*: Hop
|
||||
delayMs*: int
|
||||
serializedSphinxPacket*: seq[byte]
|
||||
of ProcessingStatus.Reply:
|
||||
id*: SURBIdentifier
|
||||
delta_prime*: seq[byte]
|
||||
else:
|
||||
discard
|
||||
|
||||
proc isZeros(data: seq[byte], startIdx: int, endIdx: int): bool =
|
||||
doAssert 0 <= startIdx and endIdx < data.len and startIdx <= endIdx
|
||||
for i in startIdx .. endIdx:
|
||||
if data[i] != 0:
|
||||
return false
|
||||
return true
|
||||
|
||||
template extractSurbId(data: seq[byte]): SURBIdentifier =
|
||||
const startIndex = t * k
|
||||
const endIndex = startIndex + SurbIdLen - 1
|
||||
doAssert data.len > startIndex and endIndex < data.len
|
||||
var id: SURBIdentifier
|
||||
copyMem(addr id[0], addr data[startIndex], SurbIdLen)
|
||||
id
|
||||
|
||||
proc processSphinxPacket*(
|
||||
sphinxPacket: SphinxPacket, privateKey: FieldElement, tm: var TagManager
|
||||
): Result[ProcessedSphinxPacket, string] =
|
||||
@@ -228,19 +314,16 @@ proc processSphinxPacket*(
|
||||
let delta_prime = aes_ctr(delta_aes_key, delta_iv, payload)
|
||||
|
||||
# Compute B
|
||||
var zeroPadding = newSeq[byte]((t + 1) * k)
|
||||
|
||||
let zeroPadding = newSeq[byte]((t + 1) * k)
|
||||
let B = aes_ctr(beta_aes_key, beta_iv, beta & zeroPadding)
|
||||
|
||||
# Check if B has the required prefix for the original message
|
||||
zeroPadding = newSeq[byte](paddingLength)
|
||||
|
||||
if B[((t + 1) * k) .. ((t + 1) * k) + paddingLength - 1] == zeroPadding:
|
||||
if B.isZeros((t + 1) * k, ((t + 1) * k) + PaddingLength - 1):
|
||||
let hop = Hop.deserialize(B[0 .. AddrSize - 1]).valueOr:
|
||||
return err(error)
|
||||
|
||||
if B[AddrSize .. ((t + 1) * k) - 1] == newSeq[byte](k + 2):
|
||||
if delta_prime[0 .. (k - 1)] == newSeq[byte](k):
|
||||
if B.isZeros(AddrSize, ((t + 1) * k) - 1):
|
||||
if delta_prime.isZeros(0, k - 1):
|
||||
let msg = Message.deserialize(delta_prime).valueOr:
|
||||
return err("Message deserialization error: " & error)
|
||||
return ok(
|
||||
@@ -250,9 +333,12 @@ proc processSphinxPacket*(
|
||||
)
|
||||
else:
|
||||
return err("delta_prime should be all zeros")
|
||||
elif B[0 .. (t * k) - 1] == newSeq[byte](t * k):
|
||||
# TODO: handle REPLY case
|
||||
discard
|
||||
elif B.isZeros(0, (t * k) - 1):
|
||||
return ok(
|
||||
ProcessedSphinxPacket(
|
||||
status: Reply, id: B.extractSurbId(), delta_prime: delta_prime
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Extract routing information from B
|
||||
let routingInfo = RoutingInfo.deserialize(B).valueOr:
|
||||
|
||||
@@ -150,57 +150,6 @@ proc createStreamServer*[T](
|
||||
except CatchableError as exc:
|
||||
raise newException(LPError, "failed simpler createStreamServer: " & exc.msg, exc)
|
||||
|
||||
proc createAsyncSocket*(ma: MultiAddress): AsyncFD {.raises: [ValueError, LPError].} =
|
||||
## Create new asynchronous socket using MultiAddress' ``ma`` socket type and
|
||||
## protocol information.
|
||||
##
|
||||
## Returns ``asyncInvalidSocket`` on error.
|
||||
##
|
||||
## Note: This procedure only used in `go-libp2p-daemon` wrapper.
|
||||
##
|
||||
|
||||
var
|
||||
socktype: SockType = SockType.SOCK_STREAM
|
||||
protocol: Protocol = Protocol.IPPROTO_TCP
|
||||
|
||||
let address = initTAddress(ma).tryGet()
|
||||
if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
|
||||
if ma[1].tryGet().protoCode().tryGet() == multiCodec("udp"):
|
||||
socktype = SockType.SOCK_DGRAM
|
||||
protocol = Protocol.IPPROTO_UDP
|
||||
elif ma[1].tryGet().protoCode().tryGet() == multiCodec("tcp"):
|
||||
socktype = SockType.SOCK_STREAM
|
||||
protocol = Protocol.IPPROTO_TCP
|
||||
elif address.family in {AddressFamily.Unix}:
|
||||
socktype = SockType.SOCK_STREAM
|
||||
protocol = cast[Protocol](0)
|
||||
else:
|
||||
return asyncInvalidSocket
|
||||
|
||||
try:
|
||||
createAsyncSocket(address.getDomain(), socktype, protocol)
|
||||
except CatchableError as exc:
|
||||
raise newException(
|
||||
LPError, "Convert exception to LPError in createAsyncSocket: " & exc.msg, exc
|
||||
)
|
||||
|
||||
proc bindAsyncSocket*(sock: AsyncFD, ma: MultiAddress): bool {.raises: [LPError].} =
|
||||
## Bind socket ``sock`` to MultiAddress ``ma``.
|
||||
##
|
||||
## Note: This procedure only used in `go-libp2p-daemon` wrapper.
|
||||
##
|
||||
|
||||
var
|
||||
saddr: Sockaddr_storage
|
||||
slen: SockLen
|
||||
|
||||
let address = initTAddress(ma).tryGet()
|
||||
toSAddr(address, saddr, slen)
|
||||
if bindSocket(SocketHandle(sock), cast[ptr SockAddr](addr saddr), slen) == 0:
|
||||
result = true
|
||||
else:
|
||||
result = false
|
||||
|
||||
proc getLocalAddress*(sock: AsyncFD): TransportAddress =
|
||||
## Retrieve local socket ``sock`` address.
|
||||
##
|
||||
|
||||
@@ -1,123 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Copyright (c) 2018-2020 Status Research & Development GmbH. Licensed under
|
||||
# either of:
|
||||
# - Apache License, version 2.0
|
||||
# - MIT license
|
||||
# at your option. This file may not be copied, modified, or distributed except
|
||||
# according to those terms.
|
||||
|
||||
set -e
|
||||
|
||||
force=false
|
||||
verbose=false
|
||||
CACHE_DIR=""
|
||||
LIBP2P_COMMIT="124530a3"
|
||||
|
||||
while [[ "$#" -gt 0 ]]; do
|
||||
case "$1" in
|
||||
-f|--force) force=true ;;
|
||||
-v|--verbose) verbose=true ;;
|
||||
-h|--help)
|
||||
echo "Usage: $0 [-f|--force] [-v|--verbose] [CACHE_DIR] [COMMIT]"
|
||||
exit 0
|
||||
;;
|
||||
*)
|
||||
# First non-option is CACHE_DIR, second is LIBP2P_COMMIT
|
||||
if [[ -z "$CACHE_DIR" ]]; then
|
||||
CACHE_DIR="$1"
|
||||
elif [[ "$LIBP2P_COMMIT" == "124530a3" ]]; then
|
||||
LIBP2P_COMMIT="$1"
|
||||
else
|
||||
echo "Unknown argument: $1"
|
||||
exit 1
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
shift
|
||||
done
|
||||
|
||||
SUBREPO_DIR="vendor/go/src/github.com/libp2p/go-libp2p-daemon"
|
||||
if [[ ! -e "$SUBREPO_DIR" ]]; then
|
||||
SUBREPO_DIR="go-libp2p-daemon"
|
||||
rm -rf "$SUBREPO_DIR"
|
||||
git clone -q https://github.com/libp2p/go-libp2p-daemon
|
||||
cd "$SUBREPO_DIR"
|
||||
git checkout -q "$LIBP2P_COMMIT"
|
||||
cd ..
|
||||
fi
|
||||
|
||||
## env vars
|
||||
[[ -z "$BUILD_MSG" ]] && BUILD_MSG="Building p2pd ${LIBP2P_COMMIT}"
|
||||
|
||||
# Windows detection
|
||||
if uname | grep -qiE "mingw|msys"; then
|
||||
EXE_SUFFIX=".exe"
|
||||
# otherwise it fails in AppVeyor due to https://github.com/git-for-windows/git/issues/2495
|
||||
GIT_TIMESTAMP_ARG="--date=unix" # available since Git 2.9.4
|
||||
else
|
||||
EXE_SUFFIX=""
|
||||
GIT_TIMESTAMP_ARG="--date=format-local:%s" # available since Git 2.7.0
|
||||
fi
|
||||
|
||||
TARGET_DIR="$(go env GOPATH)/bin"
|
||||
TARGET_BINARY="${TARGET_DIR}/p2pd${EXE_SUFFIX}"
|
||||
|
||||
target_needs_rebuilding() {
|
||||
REBUILD=0
|
||||
NO_REBUILD=1
|
||||
|
||||
if [[ -n "$CACHE_DIR" && -e "${CACHE_DIR}/p2pd${EXE_SUFFIX}" ]]; then
|
||||
mkdir -p "${TARGET_DIR}"
|
||||
cp -a "$CACHE_DIR"/* "${TARGET_DIR}/"
|
||||
fi
|
||||
|
||||
# compare the built commit's timestamp to the date of the last commit (keep in mind that Git doesn't preserve file timestamps)
|
||||
if [[ -e "${TARGET_DIR}/timestamp" && $(cat "${TARGET_DIR}/timestamp") -eq $(cd "$SUBREPO_DIR"; git log --pretty=format:%cd -n 1 ${GIT_TIMESTAMP_ARG}) ]]; then
|
||||
return $NO_REBUILD
|
||||
else
|
||||
return $REBUILD
|
||||
fi
|
||||
}
|
||||
|
||||
build_target() {
|
||||
echo -e "$BUILD_MSG"
|
||||
|
||||
pushd "$SUBREPO_DIR"
|
||||
# Go module downloads can fail randomly in CI VMs, so retry them a few times
|
||||
MAX_RETRIES=5
|
||||
CURR=0
|
||||
while [[ $CURR -lt $MAX_RETRIES ]]; do
|
||||
FAILED=0
|
||||
go get ./... && break || FAILED=1
|
||||
CURR=$(( CURR + 1 ))
|
||||
if $verbose; then
|
||||
echo "retry #${CURR}"
|
||||
fi
|
||||
done
|
||||
if [[ $FAILED == 1 ]]; then
|
||||
echo "Error: still fails after retrying ${MAX_RETRIES} times."
|
||||
exit 1
|
||||
fi
|
||||
go install ./...
|
||||
|
||||
# record the last commit's timestamp
|
||||
git log --pretty=format:%cd -n 1 ${GIT_TIMESTAMP_ARG} > "${TARGET_DIR}/timestamp"
|
||||
|
||||
popd
|
||||
|
||||
# update the CI cache
|
||||
if [[ -n "$CACHE_DIR" ]]; then
|
||||
rm -rf "$CACHE_DIR"
|
||||
mkdir "$CACHE_DIR"
|
||||
cp -a "$TARGET_DIR"/* "$CACHE_DIR"/
|
||||
fi
|
||||
echo "Binary built successfully: $TARGET_BINARY"
|
||||
}
|
||||
|
||||
if $force || target_needs_rebuilding; then
|
||||
build_target
|
||||
else
|
||||
echo "No rebuild needed."
|
||||
fi
|
||||
|
||||
@@ -1,665 +0,0 @@
|
||||
import chronos, chronicles, stew/byteutils
|
||||
import helpers
|
||||
import ../libp2p
|
||||
import ../libp2p/[daemon/daemonapi, varint, transports/wstransport, crypto/crypto]
|
||||
import ../libp2p/protocols/connectivity/relay/[relay, client, utils]
|
||||
|
||||
type
|
||||
SwitchCreator = proc(
|
||||
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
|
||||
prov = proc(config: TransportConfig): Transport =
|
||||
TcpTransport.new({}, config.upgr),
|
||||
relay: Relay = Relay.new(circuitRelayV1 = true),
|
||||
): Switch {.gcsafe, raises: [LPError].}
|
||||
DaemonPeerInfo = daemonapi.PeerInfo
|
||||
|
||||
proc writeLp(s: StreamTransport, msg: string | seq[byte]): Future[int] {.gcsafe.} =
|
||||
## write lenght prefixed
|
||||
var buf = initVBuffer()
|
||||
buf.writeSeq(msg)
|
||||
buf.finish()
|
||||
result = s.write(buf.buffer)
|
||||
|
||||
proc readLp(s: StreamTransport): Future[seq[byte]] {.async.} =
|
||||
## read length prefixed msg
|
||||
var
|
||||
size: uint
|
||||
length: int
|
||||
res: VarintResult[void]
|
||||
result = newSeq[byte](10)
|
||||
|
||||
for i in 0 ..< len(result):
|
||||
await s.readExactly(addr result[i], 1)
|
||||
res = LP.getUVarint(result.toOpenArray(0, i), length, size)
|
||||
if res.isOk():
|
||||
break
|
||||
res.expect("Valid varint")
|
||||
result.setLen(size)
|
||||
if size > 0.uint:
|
||||
await s.readExactly(addr result[0], int(size))
|
||||
|
||||
proc testPubSubDaemonPublish(
|
||||
gossip: bool = false, count: int = 1, swCreator: SwitchCreator
|
||||
) {.async.} =
|
||||
var pubsubData = "TEST MESSAGE"
|
||||
var testTopic = "test-topic"
|
||||
var msgData = pubsubData.toBytes()
|
||||
|
||||
var flags = {PSFloodSub}
|
||||
if gossip:
|
||||
flags = {PSGossipSub}
|
||||
|
||||
let daemonNode = await newDaemonApi(flags)
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
let nativeNode = swCreator()
|
||||
|
||||
let pubsub =
|
||||
if gossip:
|
||||
GossipSub.init(switch = nativeNode).PubSub
|
||||
else:
|
||||
FloodSub.init(switch = nativeNode).PubSub
|
||||
|
||||
nativeNode.mount(pubsub)
|
||||
|
||||
await nativeNode.start()
|
||||
await pubsub.start()
|
||||
let nativePeer = nativeNode.peerInfo
|
||||
|
||||
var finished = false
|
||||
var times = 0
|
||||
proc nativeHandler(topic: string, data: seq[byte]) {.async.} =
|
||||
let smsg = string.fromBytes(data)
|
||||
check smsg == pubsubData
|
||||
times.inc()
|
||||
if times >= count and not finished:
|
||||
finished = true
|
||||
|
||||
await nativeNode.connect(daemonPeer.peer, daemonPeer.addresses)
|
||||
|
||||
await sleepAsync(500.millis)
|
||||
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
||||
|
||||
proc pubsubHandler(
|
||||
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
|
||||
): Future[bool] {.async: (raises: [CatchableError]).} =
|
||||
result = true # don't cancel subscription
|
||||
|
||||
asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
|
||||
pubsub.subscribe(testTopic, nativeHandler)
|
||||
await sleepAsync(3.seconds)
|
||||
|
||||
proc publisher() {.async.} =
|
||||
while not finished:
|
||||
await daemonNode.pubsubPublish(testTopic, msgData)
|
||||
await sleepAsync(250.millis)
|
||||
|
||||
await wait(publisher(), 5.minutes) # should be plenty of time
|
||||
|
||||
await nativeNode.stop()
|
||||
await pubsub.stop()
|
||||
await daemonNode.close()
|
||||
|
||||
proc testPubSubNodePublish(
|
||||
gossip: bool = false, count: int = 1, swCreator: SwitchCreator
|
||||
) {.async.} =
|
||||
var pubsubData = "TEST MESSAGE"
|
||||
var testTopic = "test-topic"
|
||||
var msgData = pubsubData.toBytes()
|
||||
|
||||
var flags = {PSFloodSub}
|
||||
if gossip:
|
||||
flags = {PSGossipSub}
|
||||
|
||||
let daemonNode = await newDaemonApi(flags)
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
let nativeNode = swCreator()
|
||||
|
||||
let pubsub =
|
||||
if gossip:
|
||||
GossipSub.init(switch = nativeNode).PubSub
|
||||
else:
|
||||
FloodSub.init(switch = nativeNode).PubSub
|
||||
|
||||
nativeNode.mount(pubsub)
|
||||
|
||||
await nativeNode.start()
|
||||
await pubsub.start()
|
||||
let nativePeer = nativeNode.peerInfo
|
||||
|
||||
await nativeNode.connect(daemonPeer.peer, daemonPeer.addresses)
|
||||
|
||||
await sleepAsync(500.millis)
|
||||
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
||||
|
||||
var times = 0
|
||||
var finished = false
|
||||
proc pubsubHandler(
|
||||
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
|
||||
): Future[bool] {.async: (raises: [CatchableError]).} =
|
||||
let smsg = string.fromBytes(message.data)
|
||||
check smsg == pubsubData
|
||||
times.inc()
|
||||
if times >= count and not finished:
|
||||
finished = true
|
||||
result = true # don't cancel subscription
|
||||
|
||||
discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
|
||||
proc nativeHandler(topic: string, data: seq[byte]) {.async.} =
|
||||
discard
|
||||
|
||||
pubsub.subscribe(testTopic, nativeHandler)
|
||||
await sleepAsync(3.seconds)
|
||||
|
||||
proc publisher() {.async.} =
|
||||
while not finished:
|
||||
discard await pubsub.publish(testTopic, msgData)
|
||||
await sleepAsync(250.millis)
|
||||
|
||||
await wait(publisher(), 5.minutes) # should be plenty of time
|
||||
|
||||
check finished
|
||||
await nativeNode.stop()
|
||||
await pubsub.stop()
|
||||
await daemonNode.close()
|
||||
|
||||
proc commonInteropTests*(name: string, swCreator: SwitchCreator) =
|
||||
suite "Interop using " & name:
|
||||
# TODO: chronos transports are leaking,
|
||||
# but those are tracked for both the daemon
|
||||
# and libp2p, so not sure which one it is,
|
||||
# need to investigate more
|
||||
# teardown:
|
||||
# checkTrackers()
|
||||
|
||||
# TODO: this test is failing sometimes on windows
|
||||
# For some reason we receive EOF before test 4 sometimes
|
||||
|
||||
asyncTest "native -> daemon multiple reads and writes":
|
||||
var protos = @["/test-stream"]
|
||||
|
||||
let nativeNode = swCreator()
|
||||
|
||||
await nativeNode.start()
|
||||
let daemonNode = await newDaemonApi()
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
|
||||
var testFuture = newFuture[void]("test.future")
|
||||
proc daemonHandler(
|
||||
api: DaemonAPI, stream: P2PStream
|
||||
) {.async: (raises: [CatchableError]).} =
|
||||
check string.fromBytes(await stream.transp.readLp()) == "test 1"
|
||||
discard await stream.transp.writeLp("test 2")
|
||||
check string.fromBytes(await stream.transp.readLp()) == "test 3"
|
||||
discard await stream.transp.writeLp("test 4")
|
||||
testFuture.complete()
|
||||
|
||||
await daemonNode.addHandler(protos, daemonHandler)
|
||||
let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0])
|
||||
await conn.writeLp("test 1")
|
||||
check "test 2" == string.fromBytes((await conn.readLp(1024)))
|
||||
|
||||
await conn.writeLp("test 3")
|
||||
check "test 4" == string.fromBytes((await conn.readLp(1024)))
|
||||
|
||||
await wait(testFuture, 10.secs)
|
||||
|
||||
await nativeNode.stop()
|
||||
await daemonNode.close()
|
||||
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
asyncTest "native -> daemon connection":
|
||||
var protos = @["/test-stream"]
|
||||
var test = "TEST STRING"
|
||||
# We are preparing expect string, which should be prefixed with varint
|
||||
# length and do not have `\r\n` suffix, because we going to use
|
||||
# readLine().
|
||||
var buffer = initVBuffer()
|
||||
buffer.writeSeq(test & "\r\n")
|
||||
buffer.finish()
|
||||
var expect = newString(len(buffer) - 2)
|
||||
copyMem(addr expect[0], addr buffer.buffer[0], len(expect))
|
||||
|
||||
let nativeNode = swCreator()
|
||||
|
||||
await nativeNode.start()
|
||||
|
||||
let daemonNode = await newDaemonApi()
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
|
||||
var testFuture = newFuture[string]("test.future")
|
||||
proc daemonHandler(
|
||||
api: DaemonAPI, stream: P2PStream
|
||||
) {.async: (raises: [CatchableError]).} =
|
||||
# We should perform `readLp()` instead of `readLine()`. `readLine()`
|
||||
# here reads actually length prefixed string.
|
||||
var line = await stream.transp.readLine()
|
||||
check line == expect
|
||||
testFuture.complete(line)
|
||||
await stream.close()
|
||||
|
||||
await daemonNode.addHandler(protos, daemonHandler)
|
||||
let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0])
|
||||
await conn.writeLp(test & "\r\n")
|
||||
check expect == (await wait(testFuture, 10.secs))
|
||||
|
||||
await conn.close()
|
||||
await nativeNode.stop()
|
||||
await daemonNode.close()
|
||||
|
||||
asyncTest "daemon -> native connection":
|
||||
var protos = @["/test-stream"]
|
||||
var test = "TEST STRING"
|
||||
|
||||
var testFuture = newFuture[string]("test.future")
|
||||
proc nativeHandler(
|
||||
conn: Connection, proto: string
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
var line = string.fromBytes(await conn.readLp(1024))
|
||||
check line == test
|
||||
testFuture.complete(line)
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError:
|
||||
check false # should not be here
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
# custom proto
|
||||
var proto = new LPProtocol
|
||||
proto.handler = nativeHandler
|
||||
proto.codec = protos[0] # codec
|
||||
|
||||
let nativeNode = swCreator()
|
||||
|
||||
nativeNode.mount(proto)
|
||||
|
||||
await nativeNode.start()
|
||||
let nativePeer = nativeNode.peerInfo
|
||||
|
||||
let daemonNode = await newDaemonApi()
|
||||
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
||||
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
|
||||
discard await stream.transp.writeLp(test)
|
||||
|
||||
check test == (await wait(testFuture, 10.secs))
|
||||
|
||||
await stream.close()
|
||||
await nativeNode.stop()
|
||||
await daemonNode.close()
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
asyncTest "native -> daemon websocket connection":
|
||||
var protos = @["/test-stream"]
|
||||
var test = "TEST STRING"
|
||||
|
||||
var testFuture = newFuture[string]("test.future")
|
||||
proc nativeHandler(
|
||||
conn: Connection, proto: string
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
var line = string.fromBytes(await conn.readLp(1024))
|
||||
check line == test
|
||||
testFuture.complete(line)
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError:
|
||||
check false # should not be here
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
# custom proto
|
||||
var proto = new LPProtocol
|
||||
proto.handler = nativeHandler
|
||||
proto.codec = protos[0] # codec
|
||||
|
||||
let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet()
|
||||
|
||||
let nativeNode = swCreator(
|
||||
ma = wsAddress,
|
||||
prov = proc(config: TransportConfig): Transport =
|
||||
WsTransport.new(config.upgr),
|
||||
)
|
||||
|
||||
nativeNode.mount(proto)
|
||||
|
||||
await nativeNode.start()
|
||||
let nativePeer = nativeNode.peerInfo
|
||||
|
||||
let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress])
|
||||
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
||||
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
|
||||
discard await stream.transp.writeLp(test)
|
||||
|
||||
check test == (await wait(testFuture, 10.secs))
|
||||
|
||||
await stream.close()
|
||||
await nativeNode.stop()
|
||||
await daemonNode.close()
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
asyncTest "daemon -> native websocket connection":
|
||||
var protos = @["/test-stream"]
|
||||
var test = "TEST STRING"
|
||||
# We are preparing expect string, which should be prefixed with varint
|
||||
# length and do not have `\r\n` suffix, because we going to use
|
||||
# readLine().
|
||||
var buffer = initVBuffer()
|
||||
buffer.writeSeq(test & "\r\n")
|
||||
buffer.finish()
|
||||
var expect = newString(len(buffer) - 2)
|
||||
copyMem(addr expect[0], addr buffer.buffer[0], len(expect))
|
||||
|
||||
let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet()
|
||||
let nativeNode = SwitchBuilder
|
||||
.new()
|
||||
.withAddress(wsAddress)
|
||||
.withRng(crypto.newRng())
|
||||
.withMplex()
|
||||
.withWsTransport()
|
||||
.withNoise()
|
||||
.build()
|
||||
|
||||
await nativeNode.start()
|
||||
|
||||
let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress])
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
|
||||
var testFuture = newFuture[string]("test.future")
|
||||
proc daemonHandler(
|
||||
api: DaemonAPI, stream: P2PStream
|
||||
) {.async: (raises: [CatchableError]).} =
|
||||
# We should perform `readLp()` instead of `readLine()`. `readLine()`
|
||||
# here reads actually length prefixed string.
|
||||
var line = await stream.transp.readLine()
|
||||
check line == expect
|
||||
testFuture.complete(line)
|
||||
await stream.close()
|
||||
|
||||
await daemonNode.addHandler(protos, daemonHandler)
|
||||
let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0])
|
||||
await conn.writeLp(test & "\r\n")
|
||||
check expect == (await wait(testFuture, 10.secs))
|
||||
|
||||
await conn.close()
|
||||
await nativeNode.stop()
|
||||
await daemonNode.close()
|
||||
|
||||
asyncTest "daemon -> multiple reads and writes":
|
||||
var protos = @["/test-stream"]
|
||||
|
||||
var testFuture = newFuture[void]("test.future")
|
||||
proc nativeHandler(
|
||||
conn: Connection, proto: string
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
check "test 1" == string.fromBytes(await conn.readLp(1024))
|
||||
await conn.writeLp("test 2".toBytes())
|
||||
|
||||
check "test 3" == string.fromBytes(await conn.readLp(1024))
|
||||
await conn.writeLp("test 4".toBytes())
|
||||
|
||||
testFuture.complete()
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError:
|
||||
check false # should not be here
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
# custom proto
|
||||
var proto = new LPProtocol
|
||||
proto.handler = nativeHandler
|
||||
proto.codec = protos[0] # codec
|
||||
|
||||
let nativeNode = swCreator()
|
||||
|
||||
nativeNode.mount(proto)
|
||||
|
||||
await nativeNode.start()
|
||||
let nativePeer = nativeNode.peerInfo
|
||||
|
||||
let daemonNode = await newDaemonApi()
|
||||
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
||||
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
|
||||
|
||||
asyncDiscard stream.transp.writeLp("test 1")
|
||||
check "test 2" == string.fromBytes(await stream.transp.readLp())
|
||||
|
||||
asyncDiscard stream.transp.writeLp("test 3")
|
||||
check "test 4" == string.fromBytes(await stream.transp.readLp())
|
||||
|
||||
await wait(testFuture, 10.secs)
|
||||
|
||||
await stream.close()
|
||||
await nativeNode.stop()
|
||||
await daemonNode.close()
|
||||
|
||||
asyncTest "read write multiple":
|
||||
var protos = @["/test-stream"]
|
||||
var test = "TEST STRING"
|
||||
|
||||
var count = 0
|
||||
var testFuture = newFuture[int]("test.future")
|
||||
proc nativeHandler(
|
||||
conn: Connection, proto: string
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
while count < 10:
|
||||
var line = string.fromBytes(await conn.readLp(1024))
|
||||
check line == test
|
||||
await conn.writeLp(test.toBytes())
|
||||
count.inc()
|
||||
|
||||
testFuture.complete(count)
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError:
|
||||
check false # should not be here
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
# custom proto
|
||||
var proto = new LPProtocol
|
||||
proto.handler = nativeHandler
|
||||
proto.codec = protos[0] # codec
|
||||
|
||||
let nativeNode = swCreator()
|
||||
|
||||
nativeNode.mount(proto)
|
||||
|
||||
await nativeNode.start()
|
||||
let nativePeer = nativeNode.peerInfo
|
||||
|
||||
let daemonNode = await newDaemonApi()
|
||||
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
||||
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
|
||||
|
||||
var count2 = 0
|
||||
while count2 < 10:
|
||||
discard await stream.transp.writeLp(test)
|
||||
let line = await stream.transp.readLp()
|
||||
check test == string.fromBytes(line)
|
||||
inc(count2)
|
||||
|
||||
check 10 == (await wait(testFuture, 1.minutes))
|
||||
await stream.close()
|
||||
await nativeNode.stop()
|
||||
await daemonNode.close()
|
||||
|
||||
asyncTest "floodsub: daemon publish one":
|
||||
await testPubSubDaemonPublish(swCreator = swCreator)
|
||||
|
||||
asyncTest "floodsub: daemon publish many":
|
||||
await testPubSubDaemonPublish(count = 10, swCreator = swCreator)
|
||||
|
||||
asyncTest "gossipsub: daemon publish one":
|
||||
await testPubSubDaemonPublish(gossip = true, swCreator = swCreator)
|
||||
|
||||
asyncTest "gossipsub: daemon publish many":
|
||||
await testPubSubDaemonPublish(gossip = true, count = 10, swCreator = swCreator)
|
||||
|
||||
asyncTest "floodsub: node publish one":
|
||||
await testPubSubNodePublish(swCreator = swCreator)
|
||||
|
||||
asyncTest "floodsub: node publish many":
|
||||
await testPubSubNodePublish(count = 10, swCreator = swCreator)
|
||||
|
||||
asyncTest "gossipsub: node publish one":
|
||||
await testPubSubNodePublish(gossip = true, swCreator = swCreator)
|
||||
|
||||
asyncTest "gossipsub: node publish many":
|
||||
await testPubSubNodePublish(gossip = true, count = 10, swCreator = swCreator)
|
||||
|
||||
proc relayInteropTests*(name: string, relayCreator: SwitchCreator) =
|
||||
suite "Interop relay using " & name:
|
||||
asyncTest "NativeSrc -> NativeRelay -> DaemonDst":
|
||||
let closeBlocker = newFuture[void]()
|
||||
let daemonFinished = newFuture[void]()
|
||||
# TODO: This Future blocks the daemonHandler after sending the last message.
|
||||
# It exists because there's a strange behavior where stream.close sends
|
||||
# a Rst instead of Fin. We should investigate this at some point.
|
||||
proc daemonHandler(
|
||||
api: DaemonAPI, stream: P2PStream
|
||||
) {.async: (raises: [CatchableError]).} =
|
||||
check "line1" == string.fromBytes(await stream.transp.readLp())
|
||||
discard await stream.transp.writeLp("line2")
|
||||
check "line3" == string.fromBytes(await stream.transp.readLp())
|
||||
discard await stream.transp.writeLp("line4")
|
||||
await closeBlocker
|
||||
await stream.close()
|
||||
daemonFinished.complete()
|
||||
|
||||
let
|
||||
maSrc = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
maRel = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
src = relayCreator(maSrc, relay = RelayClient.new(circuitRelayV1 = true))
|
||||
rel = relayCreator(maRel)
|
||||
|
||||
await src.start()
|
||||
await rel.start()
|
||||
let daemonNode = await newDaemonApi()
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
let maStr =
|
||||
$rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit"
|
||||
let maddr = MultiAddress.init(maStr).tryGet()
|
||||
await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
|
||||
await rel.connect(daemonPeer.peer, daemonPeer.addresses)
|
||||
|
||||
await daemonNode.addHandler(@["/testCustom"], daemonHandler)
|
||||
|
||||
let conn = await src.dial(daemonPeer.peer, @[maddr], @["/testCustom"])
|
||||
|
||||
await conn.writeLp("line1")
|
||||
check string.fromBytes(await conn.readLp(1024)) == "line2"
|
||||
|
||||
await conn.writeLp("line3")
|
||||
check string.fromBytes(await conn.readLp(1024)) == "line4"
|
||||
|
||||
closeBlocker.complete()
|
||||
await daemonFinished
|
||||
await conn.close()
|
||||
await allFutures(src.stop(), rel.stop())
|
||||
try:
|
||||
await daemonNode.close()
|
||||
except CatchableError as e:
|
||||
when defined(windows):
|
||||
# On Windows, daemon close may fail due to socket race condition
|
||||
# This is expected behavior and can be safely ignored
|
||||
discard
|
||||
else:
|
||||
raise e
|
||||
|
||||
asyncTest "DaemonSrc -> NativeRelay -> NativeDst":
|
||||
proc customHandler(
|
||||
conn: Connection, proto: string
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
check "line1" == string.fromBytes(await conn.readLp(1024))
|
||||
await conn.writeLp("line2")
|
||||
check "line3" == string.fromBytes(await conn.readLp(1024))
|
||||
await conn.writeLp("line4")
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError:
|
||||
check false # should not be here
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
let protos = @["/customProto", RelayV1Codec]
|
||||
var customProto = new LPProtocol
|
||||
customProto.handler = customHandler
|
||||
customProto.codec = protos[0]
|
||||
let
|
||||
maRel = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
maDst = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
rel = relayCreator(maRel)
|
||||
dst = relayCreator(maDst, relay = RelayClient.new())
|
||||
|
||||
dst.mount(customProto)
|
||||
await rel.start()
|
||||
await dst.start()
|
||||
let daemonNode = await newDaemonApi()
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
let maStr =
|
||||
$rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit"
|
||||
let maddr = MultiAddress.init(maStr).tryGet()
|
||||
await daemonNode.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
|
||||
await rel.connect(dst.peerInfo.peerId, dst.peerInfo.addrs)
|
||||
await daemonNode.connect(dst.peerInfo.peerId, @[maddr])
|
||||
var stream = await daemonNode.openStream(dst.peerInfo.peerId, protos)
|
||||
|
||||
discard await stream.transp.writeLp("line1")
|
||||
check string.fromBytes(await stream.transp.readLp()) == "line2"
|
||||
discard await stream.transp.writeLp("line3")
|
||||
check string.fromBytes(await stream.transp.readLp()) == "line4"
|
||||
|
||||
await allFutures(dst.stop(), rel.stop())
|
||||
await daemonNode.close()
|
||||
|
||||
asyncTest "NativeSrc -> DaemonRelay -> NativeDst":
|
||||
proc customHandler(
|
||||
conn: Connection, proto: string
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
check "line1" == string.fromBytes(await conn.readLp(1024))
|
||||
await conn.writeLp("line2")
|
||||
check "line3" == string.fromBytes(await conn.readLp(1024))
|
||||
await conn.writeLp("line4")
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError:
|
||||
check false # should not be here
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
let protos = @["/customProto", RelayV1Codec]
|
||||
var customProto = new LPProtocol
|
||||
customProto.handler = customHandler
|
||||
customProto.codec = protos[0]
|
||||
let
|
||||
maSrc = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
maDst = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
src = relayCreator(maSrc, relay = RelayClient.new())
|
||||
dst = relayCreator(maDst, relay = RelayClient.new())
|
||||
|
||||
dst.mount(customProto)
|
||||
await src.start()
|
||||
await dst.start()
|
||||
let daemonNode = await newDaemonApi({RelayHop})
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
let maStr = $daemonPeer.addresses[0] & "/p2p/" & $daemonPeer.peer & "/p2p-circuit"
|
||||
let maddr = MultiAddress.init(maStr).tryGet()
|
||||
await src.connect(daemonPeer.peer, daemonPeer.addresses)
|
||||
await daemonNode.connect(dst.peerInfo.peerId, dst.peerInfo.addrs)
|
||||
let conn = await src.dial(dst.peerInfo.peerId, @[maddr], protos[0])
|
||||
|
||||
await conn.writeLp("line1")
|
||||
check string.fromBytes(await conn.readLp(1024)) == "line2"
|
||||
|
||||
await conn.writeLp("line3")
|
||||
check string.fromBytes(await conn.readLp(1024)) == "line4"
|
||||
|
||||
await allFutures(src.stop(), dst.stop())
|
||||
await daemonNode.close()
|
||||
@@ -1,4 +1,5 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
testdiscoverymngr, testrendezvous, testrendezvousprotobuf, testrendezvousinterface
|
||||
testdiscoverymngr, testrendezvous, testrendezvouserrors, testrendezvousprotobuf,
|
||||
testrendezvousinterface
|
||||
|
||||
@@ -9,9 +9,17 @@
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
import sequtils, strutils
|
||||
import sequtils
|
||||
import chronos
|
||||
import ../../libp2p/[protocols/rendezvous, switch]
|
||||
import
|
||||
../../libp2p/[
|
||||
protocols/rendezvous,
|
||||
protocols/rendezvous/protobuf,
|
||||
peerinfo,
|
||||
switch,
|
||||
routing_record,
|
||||
crypto/crypto,
|
||||
]
|
||||
import ../../libp2p/discovery/discoverymngr
|
||||
import ../../libp2p/utils/offsettedseq
|
||||
import ../helpers
|
||||
@@ -73,6 +81,20 @@ suite "RendezVous":
|
||||
peerRecords.len == 1
|
||||
peerRecords[0] == peerNodes[0].switch.peerInfo.signedPeerRecord.data
|
||||
|
||||
asyncTest "Peer is not registered when peer record validation fails":
|
||||
let (rendezvousNode, peerNodes) = setupRendezvousNodeWithPeerNodes(1)
|
||||
(rendezvousNode & peerNodes).startAndDeferStop()
|
||||
|
||||
await connectNodes(peerNodes[0], rendezvousNode)
|
||||
|
||||
peerNodes[0].switch.peerInfo.signedPeerRecord =
|
||||
createCorruptedSignedPeerRecord(peerNodes[0].switch.peerInfo.peerId)
|
||||
|
||||
const namespace = "foo"
|
||||
await peerNodes[0].advertise(namespace)
|
||||
|
||||
check rendezvousNode.registered.s.len == 0
|
||||
|
||||
asyncTest "Unsubscribe removes registered peer from remote":
|
||||
let (rendezvousNode, peerNodes) = setupRendezvousNodeWithPeerNodes(1)
|
||||
(rendezvousNode & peerNodes).startAndDeferStop()
|
||||
@@ -87,6 +109,17 @@ suite "RendezVous":
|
||||
await peerNodes[0].unsubscribe(namespace)
|
||||
check (await peerNodes[0].request(Opt.some(namespace))).len == 0
|
||||
|
||||
asyncTest "Unsubscribe for not registered namespace is ignored":
|
||||
let (rendezvousNode, peerNodes) = setupRendezvousNodeWithPeerNodes(1)
|
||||
(rendezvousNode & peerNodes).startAndDeferStop()
|
||||
|
||||
await connectNodes(peerNodes[0], rendezvousNode)
|
||||
|
||||
await peerNodes[0].advertise("foo")
|
||||
await peerNodes[0].unsubscribe("bar")
|
||||
|
||||
check rendezvousNode.registered.s.len == 1
|
||||
|
||||
asyncTest "Consecutive requests with namespace returns peers with pagination":
|
||||
let (rendezvousNode, peerNodes) = setupRendezvousNodeWithPeerNodes(11)
|
||||
(rendezvousNode & peerNodes).startAndDeferStop()
|
||||
@@ -447,26 +480,3 @@ suite "RendezVous":
|
||||
# 1001st registration ignored, limit reached
|
||||
await peerRdv.advertise(namespace)
|
||||
check rendezvousNode.registered.s.len == RegistrationLimitPerPeer
|
||||
|
||||
asyncTest "Various local error":
|
||||
let rdv = RendezVous.new(minDuration = 1.minutes, maxDuration = 72.hours)
|
||||
expect AdvertiseError:
|
||||
discard await rdv.request(Opt.some("A".repeat(300)))
|
||||
expect AdvertiseError:
|
||||
discard await rdv.request(Opt.some("A"), -1)
|
||||
expect AdvertiseError:
|
||||
discard await rdv.request(Opt.some("A"), 3000)
|
||||
expect AdvertiseError:
|
||||
await rdv.advertise("A".repeat(300))
|
||||
expect AdvertiseError:
|
||||
await rdv.advertise("A", 73.hours)
|
||||
expect AdvertiseError:
|
||||
await rdv.advertise("A", 30.seconds)
|
||||
|
||||
test "Various config error":
|
||||
expect RendezVousError:
|
||||
discard RendezVous.new(minDuration = 30.seconds)
|
||||
expect RendezVousError:
|
||||
discard RendezVous.new(maxDuration = 73.hours)
|
||||
expect RendezVousError:
|
||||
discard RendezVous.new(minDuration = 15.minutes, maxDuration = 10.minutes)
|
||||
|
||||
153
tests/discovery/testrendezvouserrors.nim
Normal file
153
tests/discovery/testrendezvouserrors.nim
Normal file
@@ -0,0 +1,153 @@
|
||||
{.used.}
|
||||
|
||||
# Nim-Libp2p
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
import sequtils
|
||||
import strformat
|
||||
import strutils
|
||||
import chronos
|
||||
import
|
||||
../../libp2p/[
|
||||
protocols/rendezvous,
|
||||
protocols/rendezvous/protobuf,
|
||||
peerinfo,
|
||||
switch,
|
||||
routing_record,
|
||||
crypto/crypto,
|
||||
]
|
||||
import ../../libp2p/discovery/discoverymngr
|
||||
import ../helpers
|
||||
import ./utils
|
||||
|
||||
suite "RendezVous Errors":
|
||||
teardown:
|
||||
checkTrackers()
|
||||
|
||||
asyncTest "Various local error":
|
||||
let rdv = RendezVous.new(minDuration = 1.minutes, maxDuration = 72.hours)
|
||||
expect AdvertiseError:
|
||||
discard await rdv.request(Opt.some("A".repeat(300)))
|
||||
expect AdvertiseError:
|
||||
discard await rdv.request(Opt.some("A"), -1)
|
||||
expect AdvertiseError:
|
||||
discard await rdv.request(Opt.some("A"), 3000)
|
||||
expect AdvertiseError:
|
||||
await rdv.advertise("A".repeat(300))
|
||||
expect AdvertiseError:
|
||||
await rdv.advertise("A", 73.hours)
|
||||
expect AdvertiseError:
|
||||
await rdv.advertise("A", 30.seconds)
|
||||
|
||||
test "Various config error":
|
||||
expect RendezVousError:
|
||||
discard RendezVous.new(minDuration = 30.seconds)
|
||||
expect RendezVousError:
|
||||
discard RendezVous.new(maxDuration = 73.hours)
|
||||
expect RendezVousError:
|
||||
discard RendezVous.new(minDuration = 15.minutes, maxDuration = 10.minutes)
|
||||
|
||||
let testCases =
|
||||
@[
|
||||
(
|
||||
"Register - Invalid Namespace",
|
||||
(
|
||||
proc(node: RendezVous): Message =
|
||||
prepareRegisterMessage(
|
||||
"A".repeat(300),
|
||||
node.switch.peerInfo.signedPeerRecord.encode().get,
|
||||
2.hours,
|
||||
)
|
||||
),
|
||||
ResponseStatus.InvalidNamespace,
|
||||
),
|
||||
(
|
||||
"Register - Invalid Signed Peer Record",
|
||||
(
|
||||
proc(node: RendezVous): Message =
|
||||
# Malformed SPR - empty bytes will fail validation
|
||||
prepareRegisterMessage("namespace", newSeq[byte](), 2.hours)
|
||||
),
|
||||
ResponseStatus.InvalidSignedPeerRecord,
|
||||
),
|
||||
(
|
||||
"Register - Invalid TTL",
|
||||
(
|
||||
proc(node: RendezVous): Message =
|
||||
prepareRegisterMessage(
|
||||
"namespace", node.switch.peerInfo.signedPeerRecord.encode().get, 73.hours
|
||||
)
|
||||
),
|
||||
ResponseStatus.InvalidTTL,
|
||||
),
|
||||
(
|
||||
"Discover - Invalid Namespace",
|
||||
(
|
||||
proc(node: RendezVous): Message =
|
||||
prepareDiscoverMessage(ns = Opt.some("A".repeat(300)))
|
||||
),
|
||||
ResponseStatus.InvalidNamespace,
|
||||
),
|
||||
(
|
||||
"Discover - Invalid Cookie",
|
||||
(
|
||||
proc(node: RendezVous): Message =
|
||||
# Empty buffer will fail Cookie.decode().tryGet() and yield InvalidCookie
|
||||
prepareDiscoverMessage(cookie = Opt.some(newSeq[byte]()))
|
||||
),
|
||||
ResponseStatus.InvalidCookie,
|
||||
),
|
||||
]
|
||||
|
||||
for test in testCases:
|
||||
let (testName, getMessage, expectedStatus) = test
|
||||
|
||||
asyncTest &"Node returns ERROR_CODE for invalid message - {testName}":
|
||||
let (rendezvousNode, peerNodes) = setupRendezvousNodeWithPeerNodes(1)
|
||||
(rendezvousNode & peerNodes).startAndDeferStop()
|
||||
|
||||
await connectNodes(peerNodes[0], rendezvousNode)
|
||||
|
||||
let
|
||||
peerNode = peerNodes[0]
|
||||
messageBuf = encode(getMessage(peerNode)).buffer
|
||||
|
||||
let
|
||||
responseBuf = await sendRdvMessage(peerNode, rendezvousNode, messageBuf)
|
||||
responseMessage = Message.decode(responseBuf).tryGet()
|
||||
actualStatus =
|
||||
if responseMessage.registerResponse.isSome():
|
||||
responseMessage.registerResponse.get.status
|
||||
else:
|
||||
responseMessage.discoverResponse.get.status
|
||||
|
||||
check actualStatus == expectedStatus
|
||||
|
||||
asyncTest "Node returns NotAuthorized when Register exceeding peer limit":
|
||||
let (rendezvousNode, peerNodes) = setupRendezvousNodeWithPeerNodes(1)
|
||||
(rendezvousNode & peerNodes).startAndDeferStop()
|
||||
|
||||
await connectNodes(peerNodes[0], rendezvousNode)
|
||||
|
||||
# Pre-populate registrations up to the limit for this peer under the same namespace
|
||||
let namespace = "namespaceNA"
|
||||
await populatePeerRegistrations(
|
||||
peerNodes[0], rendezvousNode, namespace, RegistrationLimitPerPeer
|
||||
)
|
||||
|
||||
# Attempt one more registration which should be rejected with NotAuthorized
|
||||
let messageBuf = encode(
|
||||
prepareRegisterMessage(
|
||||
namespace, peerNodes[0].switch.peerInfo.signedPeerRecord.encode().get, 2.hours
|
||||
)
|
||||
).buffer
|
||||
|
||||
let responseBuf = await sendRdvMessage(peerNodes[0], rendezvousNode, messageBuf)
|
||||
let responseMessage = Message.decode(responseBuf).tryGet()
|
||||
check responseMessage.registerResponse.get.status == ResponseStatus.NotAuthorized
|
||||
@@ -1,6 +1,16 @@
|
||||
import sequtils
|
||||
import chronos
|
||||
import ../../libp2p/[protobuf/minprotobuf, protocols/rendezvous, switch, builders]
|
||||
import sequtils
|
||||
import
|
||||
../../libp2p/[
|
||||
builders,
|
||||
crypto/crypto,
|
||||
peerid,
|
||||
protobuf/minprotobuf,
|
||||
protocols/rendezvous,
|
||||
protocols/rendezvous/protobuf,
|
||||
routing_record,
|
||||
switch,
|
||||
]
|
||||
|
||||
proc createSwitch*(rdv: RendezVous = RendezVous.new()): Switch =
|
||||
SwitchBuilder
|
||||
@@ -76,3 +86,41 @@ proc populatePeerRegistrations*(
|
||||
let record = targetRdv.registered.s[0]
|
||||
for i in 0 ..< count - 1:
|
||||
targetRdv.registered.s.add(record)
|
||||
|
||||
proc createCorruptedSignedPeerRecord*(peerId: PeerId): SignedPeerRecord =
|
||||
let rng = newRng()
|
||||
let wrongPrivKey = PrivateKey.random(rng[]).tryGet()
|
||||
let record = PeerRecord.init(peerId, @[])
|
||||
SignedPeerRecord.init(wrongPrivKey, record).tryGet()
|
||||
|
||||
proc sendRdvMessage*(
|
||||
node: RendezVous, target: RendezVous, buffer: seq[byte]
|
||||
): Future[seq[byte]] {.async.} =
|
||||
let conn = await node.switch.dial(target.switch.peerInfo.peerId, RendezVousCodec)
|
||||
defer:
|
||||
await conn.close()
|
||||
|
||||
await conn.writeLp(buffer)
|
||||
|
||||
let response = await conn.readLp(4096)
|
||||
response
|
||||
|
||||
proc prepareRegisterMessage*(
|
||||
namespace: string, spr: seq[byte], ttl: Duration
|
||||
): Message =
|
||||
Message(
|
||||
msgType: MessageType.Register,
|
||||
register: Opt.some(
|
||||
Register(ns: namespace, signedPeerRecord: spr, ttl: Opt.some(ttl.seconds.uint64))
|
||||
),
|
||||
)
|
||||
|
||||
proc prepareDiscoverMessage*(
|
||||
ns: Opt[string] = Opt.none(string),
|
||||
limit: Opt[uint64] = Opt.none(uint64),
|
||||
cookie: Opt[seq[byte]] = Opt.none(seq[byte]),
|
||||
): Message =
|
||||
Message(
|
||||
msgType: MessageType.Discover,
|
||||
discover: Opt.some(Discover(ns: ns, limit: limit, cookie: cookie)),
|
||||
)
|
||||
|
||||
101
tests/mix/testfragmentation.nim
Normal file
101
tests/mix/testfragmentation.nim
Normal file
@@ -0,0 +1,101 @@
|
||||
{.used.}
|
||||
|
||||
import results, unittest
|
||||
import ../../libp2p/peerid
|
||||
import ../../libp2p/protocols/mix/[serialization, fragmentation]
|
||||
|
||||
suite "Fragmentation":
|
||||
let peerId =
|
||||
PeerId.init("16Uiu2HAmFkwLVsVh6gGPmSm9R3X4scJ5thVdKfWYeJsKeVrbcgVC").get()
|
||||
|
||||
test "serialize and deserialize message chunk":
|
||||
let
|
||||
message = newSeq[byte](DataSize)
|
||||
chunks = padAndChunkMessage(message, peerId)
|
||||
serialized = chunks[0].serialize()
|
||||
deserialized =
|
||||
MessageChunk.deserialize(serialized).expect("Deserialization error")
|
||||
|
||||
check chunks[0] == deserialized
|
||||
|
||||
test "pad and unpad small message":
|
||||
let
|
||||
message = cast[seq[byte]]("Hello, World!")
|
||||
messageBytesLen = len(message)
|
||||
paddedMsg = addPadding(message, peerId)
|
||||
unpaddedMessage = removePadding(paddedMsg).expect("Unpad error")
|
||||
|
||||
let (paddingLength, data, _) = paddedMsg.get()
|
||||
|
||||
check:
|
||||
paddingLength == uint16(DataSize - messageBytesLen)
|
||||
data.len == DataSize
|
||||
unpaddedMessage.len == messageBytesLen
|
||||
|
||||
test "pad and chunk large message":
|
||||
let
|
||||
message = newSeq[byte](MessageSize * 2 + (MessageSize - 1))
|
||||
messageBytesLen = len(message)
|
||||
chunks = padAndChunkMessage(message, peerId)
|
||||
totalChunks = max(1, ceilDiv(messageBytesLen, DataSize))
|
||||
|
||||
check chunks.len == totalChunks
|
||||
|
||||
for i in 0 ..< totalChunks:
|
||||
let (paddingLength, data, _) = chunks[i].get()
|
||||
if i != totalChunks - 1:
|
||||
check paddingLength == 0
|
||||
else:
|
||||
let chunkSize = messageBytesLen mod DataSize
|
||||
check paddingLength == uint16(DataSize - chunkSize)
|
||||
|
||||
check data.len == DataSize
|
||||
|
||||
test "chunk sequence numbers are consecutive":
|
||||
let
|
||||
message = newSeq[byte](MessageSize * 3)
|
||||
messageBytesLen = len(message)
|
||||
chunks = padAndChunkMessage(message, peerId)
|
||||
totalChunks = max(1, ceilDiv(messageBytesLen, DataSize))
|
||||
|
||||
check chunks.len == totalChunks
|
||||
|
||||
let (_, _, firstSeqNo) = chunks[0].get()
|
||||
|
||||
for i in 1 ..< totalChunks:
|
||||
let (_, _, seqNo) = chunks[i].get()
|
||||
check seqNo == firstSeqNo + uint32(i)
|
||||
|
||||
test "chunk data reconstructs original message":
|
||||
let
|
||||
message = cast[seq[byte]]("This is a test message that will be split into multiple chunks.")
|
||||
chunks = padAndChunkMessage(message, peerId)
|
||||
|
||||
var reconstructed: seq[byte]
|
||||
for chunk in chunks:
|
||||
let (paddingLength, data, _) = chunk.get()
|
||||
reconstructed.add(data[paddingLength.int ..^ 1])
|
||||
|
||||
check reconstructed == message
|
||||
|
||||
test "empty message handling":
|
||||
let
|
||||
message = cast[seq[byte]]("")
|
||||
chunks = padAndChunkMessage(message, peerId)
|
||||
|
||||
check chunks.len == 1
|
||||
|
||||
let (paddingLength, _, _) = chunks[0].get()
|
||||
|
||||
check paddingLength == uint16(DataSize)
|
||||
|
||||
test "message size equal to chunk size":
|
||||
let
|
||||
message = newSeq[byte](DataSize)
|
||||
chunks = padAndChunkMessage(message, peerId)
|
||||
|
||||
check chunks.len == 1
|
||||
|
||||
let (paddingLength, _, _) = chunks[0].get()
|
||||
|
||||
check paddingLength == 0
|
||||
84
tests/mix/testmixnode.nim
Normal file
84
tests/mix/testmixnode.nim
Normal file
@@ -0,0 +1,84 @@
|
||||
{.used.}
|
||||
|
||||
import strformat, results, unittest2
|
||||
import ../../libp2p/[crypto/crypto, crypto/secp, multiaddress, multicodec, peerid]
|
||||
import ../../libp2p/protocols/mix/[curve25519, mix_node]
|
||||
|
||||
suite "Mix Node Tests":
|
||||
var mixNodes {.threadvar.}: MixNodes
|
||||
|
||||
setup:
|
||||
const count = 5
|
||||
let mixNodes = initializeMixNodes(count).expect("could not generate mix nodes")
|
||||
check:
|
||||
mixNodes.len == count
|
||||
|
||||
teardown:
|
||||
deleteNodeInfoFolder()
|
||||
deletePubInfoFolder()
|
||||
|
||||
test "get mix node by index":
|
||||
for i in 0 ..< count:
|
||||
let node = mixNodes[i]
|
||||
let
|
||||
(peerId, multiAddr, mixPubKey, mixPrivKey, libp2pPubKey, libp2pPrivKey) =
|
||||
node.get()
|
||||
pubKeyProto = PublicKey(scheme: Secp256k1, skkey: libp2pPubKey)
|
||||
expectedPeerId = PeerId.init(pubKeyProto).get()
|
||||
expectedMA = MultiAddress.init(fmt"/ip4/0.0.0.0/tcp/{4242 + i}").tryGet()
|
||||
check:
|
||||
peerId == expectedPeerId
|
||||
multiAddr == expectedMA
|
||||
fieldElementToBytes(mixPubKey).len == FieldElementSize
|
||||
fieldElementToBytes(mixPrivKey).len == FieldElementSize
|
||||
libp2pPubKey.getBytes().len == SkRawPublicKeySize
|
||||
libp2pPrivKey.getBytes().len == SkRawPrivateKeySize
|
||||
|
||||
test "find mixnode by peerid":
|
||||
for i in 0 ..< count:
|
||||
let
|
||||
node = mixNodes[i]
|
||||
foundNode = mixNodes.findByPeerId(node.peerId).expect("find mix node error")
|
||||
|
||||
check:
|
||||
foundNode == node
|
||||
|
||||
test "invalid_peer_id_lookup":
|
||||
let peerId = PeerId.random().expect("could not generate peerId")
|
||||
check:
|
||||
mixNodes.findByPeerId(peerId).isErr()
|
||||
|
||||
test "write and read mixnodeinfo":
|
||||
for i in 0 ..< count:
|
||||
let node = mixNodes[i]
|
||||
node.writeToFile(i).expect("File write error")
|
||||
let readNode = MixNodeInfo.readFromFile(i).expect("Read node error")
|
||||
|
||||
check:
|
||||
readNode == node
|
||||
|
||||
test "write and read mixpubinfo":
|
||||
for i in 0 ..< count:
|
||||
let mixPubInfo =
|
||||
mixNodes.getMixPubInfoByIndex(i).expect("couldnt obtain mixpubinfo")
|
||||
mixPubInfo.writeToFile(i).expect("File write error")
|
||||
let readNode = MixPubInfo.readFromFile(i).expect("File read error")
|
||||
|
||||
check:
|
||||
mixPubInfo == readNode
|
||||
|
||||
test "read nonexistent mixnodeinfo":
|
||||
check:
|
||||
MixNodeInfo.readFromFile(999).isErr()
|
||||
|
||||
test "generate mix nodes with different ports":
|
||||
const count = 3
|
||||
let basePort = 5000
|
||||
let mixNodes =
|
||||
initializeMixNodes(count, basePort).expect("could not generate mix nodes")
|
||||
|
||||
for i in 0 ..< count:
|
||||
let tcpPort = mixNodes[i].multiAddr.getPart(multiCodec("tcp")).value()
|
||||
let expectedPort = MultiAddress.init(fmt"/tcp/{basePort + i}").tryGet()
|
||||
check:
|
||||
tcpPort == expectedPort
|
||||
@@ -2,58 +2,56 @@
|
||||
|
||||
import results, unittest
|
||||
import ../../libp2p/protocols/mix/[serialization, multiaddr]
|
||||
import ../../libp2p/multiaddress
|
||||
import ../../libp2p/[peerid, multiaddress]
|
||||
|
||||
template maddr(ma: string): MultiAddress =
|
||||
MultiAddress.init(ma).tryGet()
|
||||
|
||||
template maddrConversionShouldFail(ma: string, msg: string) =
|
||||
proc maddrConversionShouldFail(ma: string, msg: string) =
|
||||
test msg:
|
||||
echo MultiAddress
|
||||
.init(ma)
|
||||
.expect("could not initialize multiaddr")
|
||||
.multiAddrToBytes()
|
||||
.error()
|
||||
let peerId = PeerId.random().expect("could not generate peerId")
|
||||
let ma = MultiAddress.init(ma).expect("could not initialize multiaddr")
|
||||
check:
|
||||
MultiAddress
|
||||
.init(ma)
|
||||
.expect("could not initialize multiaddr")
|
||||
.multiAddrToBytes().isErr
|
||||
multiAddrToBytes(peerId, ma).isErr
|
||||
|
||||
suite "Utils tests":
|
||||
test "multi_addr_conversion":
|
||||
test "multiaddress conversion":
|
||||
let multiAddrs = [
|
||||
"/ip4/0.0.0.0/tcp/4242/p2p/16Uiu2HAmFkwLVsVh6gGPmSm9R3X4scJ5thVdKfWYeJsKeVrbcgVC",
|
||||
"/ip4/10.0.0.1/tcp/1234/p2p/16Uiu2HAmDHw4mwBdEjxjJPhrt8Eq1kvDjXAuwkqCmhNiz363AFV2",
|
||||
"/ip4/192.168.1.1/udp/8080/quic-v1/p2p/16Uiu2HAm6WNzw8AssyPscYYi8x1bY5wXyQrGTShRH75bh5dPCjBQ",
|
||||
"/ip4/10.0.0.1/tcp/1234/p2p/16Uiu2HAmDHw4mwBdEjxjJPhrt8Eq1kvDjXAuwkqCmhNiz363AFV2/p2p-circuit/p2p/16Uiu2HAm6WNzw8AssyPscYYi8x1bY5wXyQrGTShRH75bh5dPCjBQ",
|
||||
"/ip4/10.0.0.1/udp/1234/quic-v1/p2p/16Uiu2HAmDHw4mwBdEjxjJPhrt8Eq1kvDjXAuwkqCmhNiz363AFV2/p2p-circuit/p2p/16Uiu2HAm6WNzw8AssyPscYYi8x1bY5wXyQrGTShRH75bh5dPCjBQ",
|
||||
"/ip4/0.0.0.0/tcp/4242", "/ip4/10.0.0.1/tcp/1234",
|
||||
"/ip4/192.168.1.1/udp/8080/quic-v1",
|
||||
"/ip4/10.0.0.1/tcp/1234/p2p/16Uiu2HAmDHw4mwBdEjxjJPhrt8Eq1kvDjXAuwkqCmhNiz363AFV2/p2p-circuit",
|
||||
"/ip4/10.0.0.1/udp/1234/quic-v1/p2p/16Uiu2HAmDHw4mwBdEjxjJPhrt8Eq1kvDjXAuwkqCmhNiz363AFV2/p2p-circuit",
|
||||
]
|
||||
|
||||
for multiAddr in multiAddrs:
|
||||
let ma = maddr(multiAddr)
|
||||
let multiAddrBytes = ma.multiAddrToBytes().expect("conversion failed")
|
||||
let
|
||||
ma = maddr(multiAddr)
|
||||
peerId = PeerId.random().expect("could not generate peerId")
|
||||
multiAddrBytes = multiAddrToBytes(peerId, ma).expect("conversion failed")
|
||||
|
||||
check multiAddrBytes.len == AddrSize
|
||||
let deserializedMa = bytesToMultiAddr(multiAddrBytes).expect("conversion failed")
|
||||
check deserializedMa == ma
|
||||
|
||||
let (dPeerId, deserializedMa) =
|
||||
bytesToMultiAddr(multiAddrBytes).expect("conversion failed")
|
||||
|
||||
check:
|
||||
deserializedMa == ma
|
||||
dPeerId == peerId
|
||||
|
||||
maddrConversionShouldFail("/ip4/0.0.0.0/tcp/4242/quic-v1/", "invalid protocol")
|
||||
|
||||
maddrConversionShouldFail("/ip4/0.0.0.0", "invalid multiaddress format")
|
||||
|
||||
maddrConversionShouldFail(
|
||||
"/ip4/0.0.0.0/tcp/4242/quic-v1/p2p/16Uiu2HAmFkwLVsVh6gGPmSm9R3X4scJ5thVdKfWYeJsKeVrbcgVC",
|
||||
"invalid_protocol",
|
||||
"/ip4/0.0.0.0/tcp/4242/p2p-circuit", "invalid multiaddress format circuit relay"
|
||||
)
|
||||
|
||||
maddrConversionShouldFail(
|
||||
"/ip4/0.0.0.0/tcp/4242/p2p/QmcycySVeRSftFQGM392xCqDh6UUbhSU9ykNpxrFBPX3gJ",
|
||||
"invalid_peerid_length",
|
||||
"/ip4/0.0.0.0/tcp/4242/p2p/QmcycySVeRSftFQGM392xCqDh6UUbhSU9ykNpxrFBPX3gJ/p2p-circuit",
|
||||
"invalid peerId in circuit relay addr",
|
||||
)
|
||||
|
||||
maddrConversionShouldFail("/ip4/0.0.0.0/tcp/4242", "invalid_multiaddress_format")
|
||||
|
||||
maddrConversionShouldFail(
|
||||
"/ip4/0.0.0.0/tcp/4242/p2p-circuit", "invalid_multiaddress_format_circuit_relay"
|
||||
)
|
||||
|
||||
test "invalid_addr_length":
|
||||
test "invalid address length":
|
||||
let invalidBytes = newSeq[byte](AddrSize - 1)
|
||||
check:
|
||||
bytesToMultiAddr(invalidBytes).isErr
|
||||
|
||||
@@ -56,7 +56,7 @@ suite "serialization_tests":
|
||||
header = Header.init(
|
||||
newSeq[byte](AlphaSize), newSeq[byte](BetaSize), newSeq[byte](GammaSize)
|
||||
)
|
||||
payload = newSeq[byte](payloadSize)
|
||||
payload = newSeq[byte](PayloadSize)
|
||||
packet = SphinxPacket.init(header, payload)
|
||||
|
||||
let serialized = packet.serialize()
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
{.used.}
|
||||
|
||||
import random, results, unittest
|
||||
import random, results, unittest, chronicles
|
||||
import ../../libp2p/crypto/crypto
|
||||
import ../../libp2p/protocols/mix/[curve25519, serialization, sphinx, tag_manager]
|
||||
import bearssl/rand
|
||||
|
||||
# Helper function to pad/truncate message
|
||||
proc padMessage(message: openArray[byte], size: int): seq[byte] =
|
||||
proc addPadding(message: openArray[byte], size: int): seq[byte] =
|
||||
if message.len >= size:
|
||||
return message[0 .. size - 1] # Truncate if larger
|
||||
else:
|
||||
@@ -39,8 +39,8 @@ proc createDummyData(): (
|
||||
dest = Hop.init(newSeq[byte](AddrSize))
|
||||
return (message, privateKeys, publicKeys, delay, hops, dest)
|
||||
|
||||
proc randomI(): I =
|
||||
newRng()[].generate(I)
|
||||
template randomI(): SURBIdentifier =
|
||||
newRng()[].generate(SURBIdentifier)
|
||||
|
||||
# Unit tests for sphinx.nim
|
||||
suite "Sphinx Tests":
|
||||
@@ -52,7 +52,7 @@ suite "Sphinx Tests":
|
||||
teardown:
|
||||
clearTags(tm)
|
||||
|
||||
test "sphinx_wrap_and_process":
|
||||
test "sphinx wrap and process":
|
||||
let (message, privateKeys, publicKeys, delay, hops, dest) = createDummyData()
|
||||
|
||||
let packetBytes = wrapInSphinxPacket(message, publicKeys, delay, hops, dest).expect(
|
||||
@@ -94,7 +94,7 @@ suite "Sphinx Tests":
|
||||
processedSP3.status == Exit
|
||||
processedSP3.messageChunk == message
|
||||
|
||||
test "sphinx_wrap_empty_public_keys":
|
||||
test "sphinx wrap empty public keys":
|
||||
let (message, _, _, delay, _, dest) = createDummyData()
|
||||
check wrapInSphinxPacket(message, @[], delay, @[], dest).isErr
|
||||
|
||||
@@ -118,7 +118,7 @@ suite "Sphinx Tests":
|
||||
|
||||
check invalidMacPkt.status == InvalidMAC
|
||||
|
||||
test "sphinx_process_duplicate_tag":
|
||||
test "sphinx process duplicate tag":
|
||||
let (message, privateKeys, publicKeys, delay, hops, dest) = createDummyData()
|
||||
|
||||
let packetBytes = wrapInSphinxPacket(message, publicKeys, delay, hops, dest).expect(
|
||||
@@ -140,7 +140,7 @@ suite "Sphinx Tests":
|
||||
|
||||
check processedSP2.status == Duplicate
|
||||
|
||||
test "sphinx_wrap_and_process_message_sizes":
|
||||
test "sphinx wrap and process message sizes":
|
||||
let MessageSizes = @[32, 64, 128, 256, 512]
|
||||
for size in MessageSizes:
|
||||
let (_, privateKeys, publicKeys, delay, hops, dest) = createDummyData()
|
||||
@@ -148,7 +148,7 @@ suite "Sphinx Tests":
|
||||
randomize()
|
||||
for i in 0 ..< size:
|
||||
message[i] = byte(rand(256))
|
||||
let paddedMessage = padMessage(message, MessageSize)
|
||||
let paddedMessage = addPadding(message, MessageSize)
|
||||
|
||||
let packetBytes = wrapInSphinxPacket(paddedMessage, publicKeys, delay, hops, dest)
|
||||
.expect("Sphinx wrap error")
|
||||
@@ -186,3 +186,150 @@ suite "Sphinx Tests":
|
||||
check:
|
||||
processedSP3.status == Exit
|
||||
processedSP3.messageChunk == paddedMessage
|
||||
|
||||
test "create and use surb":
|
||||
let (message, privateKeys, publicKeys, delay, hops, _) = createDummyData()
|
||||
|
||||
let surb =
|
||||
createSURB(publicKeys, delay, hops, randomI()).expect("Create SURB error")
|
||||
let packetBytes = useSURB(surb, message).serialize()
|
||||
|
||||
check packetBytes.len == PacketSize
|
||||
|
||||
let packet = SphinxPacket.deserialize(packetBytes).expect("Sphinx wrap error")
|
||||
let processedSP1 =
|
||||
processSphinxPacket(packet, privateKeys[0], tm).expect("Sphinx processing error")
|
||||
|
||||
check:
|
||||
processedSP1.status == Intermediate
|
||||
processedSP1.serializedSphinxPacket.len == PacketSize
|
||||
|
||||
let processedPacket1 = SphinxPacket
|
||||
.deserialize(processedSP1.serializedSphinxPacket)
|
||||
.expect("Sphinx wrap error")
|
||||
|
||||
let processedSP2 = processSphinxPacket(processedPacket1, privateKeys[1], tm).expect(
|
||||
"Sphinx processing error"
|
||||
)
|
||||
|
||||
check:
|
||||
processedSP2.status == Intermediate
|
||||
processedSP2.serializedSphinxPacket.len == PacketSize
|
||||
|
||||
let processedPacket2 = SphinxPacket
|
||||
.deserialize(processedSP2.serializedSphinxPacket)
|
||||
.expect("Sphinx wrap error")
|
||||
|
||||
let processedSP3 = processSphinxPacket(processedPacket2, privateKeys[2], tm).expect(
|
||||
"Sphinx processing error"
|
||||
)
|
||||
|
||||
check processedSP3.status == Reply
|
||||
|
||||
let msg = processReply(surb.key, surb.secret.get(), processedSP3.delta_prime).expect(
|
||||
"Reply processing failed"
|
||||
)
|
||||
|
||||
check msg == message
|
||||
|
||||
test "create surb empty public keys":
|
||||
let (message, _, _, delay, _, _) = createDummyData()
|
||||
check createSURB(@[], delay, @[], randomI()).isErr()
|
||||
|
||||
test "surb sphinx process invalid mac":
|
||||
let (message, privateKeys, publicKeys, delay, hops, _) = createDummyData()
|
||||
|
||||
let surb =
|
||||
createSURB(publicKeys, delay, hops, randomI()).expect("Create SURB error")
|
||||
|
||||
let packetBytes = useSURB(surb, message).serialize()
|
||||
|
||||
check packetBytes.len == PacketSize
|
||||
|
||||
# Corrupt the MAC for testing
|
||||
var tamperedPacketBytes = packetBytes
|
||||
tamperedPacketBytes[0] = packetBytes[0] xor 0x01
|
||||
|
||||
let tamperedPacket =
|
||||
SphinxPacket.deserialize(tamperedPacketBytes).expect("Sphinx wrap error")
|
||||
|
||||
let processedSP1 = processSphinxPacket(tamperedPacket, privateKeys[0], tm).expect(
|
||||
"Sphinx processing error"
|
||||
)
|
||||
|
||||
check processedSP1.status == InvalidMAC
|
||||
|
||||
test "surb sphinx process duplicate tag":
|
||||
let (message, privateKeys, publicKeys, delay, hops, _) = createDummyData()
|
||||
|
||||
let surb =
|
||||
createSURB(publicKeys, delay, hops, randomI()).expect("Create SURB error")
|
||||
|
||||
let packetBytes = useSURB(surb, message).serialize()
|
||||
|
||||
check packetBytes.len == PacketSize
|
||||
|
||||
let packet = SphinxPacket.deserialize(packetBytes).expect("Sphinx wrap error")
|
||||
|
||||
# Process the packet twice to test duplicate tag handling
|
||||
let processedSP1 =
|
||||
processSphinxPacket(packet, privateKeys[0], tm).expect("Sphinx processing error")
|
||||
|
||||
check processedSP1.status == Intermediate
|
||||
|
||||
let processedSP2 =
|
||||
processSphinxPacket(packet, privateKeys[0], tm).expect("Sphinx processing error")
|
||||
|
||||
check processedSP2.status == Duplicate
|
||||
|
||||
test "create and use surb message sizes":
|
||||
let messageSizes = @[32, 64, 128, 256, 512]
|
||||
for size in messageSizes:
|
||||
let (_, privateKeys, publicKeys, delay, hops, _) = createDummyData()
|
||||
var message = newSeq[byte](size)
|
||||
randomize()
|
||||
for i in 0 ..< size:
|
||||
message[i] = byte(rand(256))
|
||||
let paddedMessage = addPadding(message, MessageSize)
|
||||
|
||||
let surb =
|
||||
createSURB(publicKeys, delay, hops, randomI()).expect("Create SURB error")
|
||||
|
||||
let packetBytes = useSURB(surb, Message(paddedMessage)).serialize()
|
||||
|
||||
check packetBytes.len == PacketSize
|
||||
|
||||
let packet = SphinxPacket.deserialize(packetBytes).expect("Sphinx wrap error")
|
||||
|
||||
let processedSP1 = processSphinxPacket(packet, privateKeys[0], tm).expect(
|
||||
"Sphinx processing error"
|
||||
)
|
||||
|
||||
check:
|
||||
processedSP1.status == Intermediate
|
||||
processedSP1.serializedSphinxPacket.len == PacketSize
|
||||
|
||||
let processedPacket1 = SphinxPacket
|
||||
.deserialize(processedSP1.serializedSphinxPacket)
|
||||
.expect("Sphinx wrap error")
|
||||
|
||||
let processedSP2 = processSphinxPacket(processedPacket1, privateKeys[1], tm)
|
||||
.expect("Sphinx processing error")
|
||||
|
||||
check:
|
||||
processedSP2.status == Intermediate
|
||||
processedSP2.serializedSphinxPacket.len == PacketSize
|
||||
|
||||
let processedPacket2 = SphinxPacket
|
||||
.deserialize(processedSP2.serializedSphinxPacket)
|
||||
.expect("Sphinx wrap error")
|
||||
|
||||
let processedSP3 = processSphinxPacket(processedPacket2, privateKeys[2], tm)
|
||||
.expect("Sphinx processing error")
|
||||
|
||||
check processedSP3.status == Reply
|
||||
|
||||
let msg = processReply(surb.key, surb.secret.get(), processedSP3.delta_prime)
|
||||
.expect("Reply processing failed")
|
||||
|
||||
check paddedMessage == msg
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
{.used.}
|
||||
|
||||
import testnative, ./pubsub/testpubsub, testinterop, testdaemon
|
||||
import testnative, ./pubsub/testpubsub
|
||||
|
||||
@@ -31,6 +31,8 @@ import ./helpers
|
||||
when defined(linux) and defined(amd64):
|
||||
{.used.}
|
||||
|
||||
import ../libp2p/utils/ipaddr
|
||||
|
||||
suite "AutoTLS Integration":
|
||||
asyncTeardown:
|
||||
checkTrackers()
|
||||
|
||||
@@ -1,123 +0,0 @@
|
||||
import chronos, unittest2, helpers
|
||||
import
|
||||
../libp2p/daemon/daemonapi,
|
||||
../libp2p/multiaddress,
|
||||
../libp2p/multicodec,
|
||||
../libp2p/cid,
|
||||
../libp2p/multihash,
|
||||
../libp2p/peerid
|
||||
|
||||
when defined(nimHasUsed):
|
||||
{.used.}
|
||||
|
||||
proc identitySpawnTest(): Future[bool] {.async.} =
|
||||
var api = await newDaemonApi()
|
||||
var data = await api.identity()
|
||||
await api.close()
|
||||
result = true
|
||||
|
||||
proc connectStreamTest(): Future[bool] {.async.} =
|
||||
var api1 = await newDaemonApi()
|
||||
var api2 = await newDaemonApi()
|
||||
|
||||
var id1 = await api1.identity()
|
||||
var id2 = await api2.identity()
|
||||
|
||||
var protos = @["/test-stream"]
|
||||
var test = "TEST STRING"
|
||||
|
||||
var testFuture = newFuture[string]("test.future")
|
||||
|
||||
proc streamHandler(
|
||||
api: DaemonAPI, stream: P2PStream
|
||||
) {.async: (raises: [CatchableError]).} =
|
||||
var line = await stream.transp.readLine()
|
||||
testFuture.complete(line)
|
||||
|
||||
await api2.addHandler(protos, streamHandler)
|
||||
await api1.connect(id2.peer, id2.addresses)
|
||||
# echo await api1.listPeers()
|
||||
var stream = await api1.openStream(id2.peer, protos)
|
||||
let sent = await stream.transp.write(test & "\r\n")
|
||||
doAssert(sent == len(test) + 2)
|
||||
doAssert((await wait(testFuture, 10.seconds)) == test)
|
||||
await stream.close()
|
||||
await api1.close()
|
||||
await api2.close()
|
||||
result = true
|
||||
|
||||
proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
|
||||
var pubsubData = "TEST MESSAGE"
|
||||
var msgData = cast[seq[byte]](pubsubData)
|
||||
var api1, api2: DaemonAPI
|
||||
|
||||
api1 = await newDaemonApi(f)
|
||||
api2 = await newDaemonApi(f)
|
||||
|
||||
var id1 = await api1.identity()
|
||||
var id2 = await api2.identity()
|
||||
|
||||
var resultsCount = 0
|
||||
|
||||
var handlerFuture1 = newFuture[void]()
|
||||
var handlerFuture2 = newFuture[void]()
|
||||
|
||||
proc pubsubHandler1(
|
||||
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
|
||||
): Future[bool] {.async: (raises: [CatchableError]).} =
|
||||
let smsg = cast[string](message.data)
|
||||
if smsg == pubsubData:
|
||||
inc(resultsCount)
|
||||
handlerFuture1.complete()
|
||||
# Callback must return `false` to close subscription channel.
|
||||
result = false
|
||||
|
||||
proc pubsubHandler2(
|
||||
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
|
||||
): Future[bool] {.async: (raises: [CatchableError]).} =
|
||||
let smsg = cast[string](message.data)
|
||||
if smsg == pubsubData:
|
||||
inc(resultsCount)
|
||||
handlerFuture2.complete()
|
||||
# Callback must return `false` to close subscription channel.
|
||||
result = false
|
||||
|
||||
await api1.connect(id2.peer, id2.addresses)
|
||||
await api2.connect(id1.peer, id1.addresses)
|
||||
|
||||
var ticket1 = await api1.pubsubSubscribe("test-topic", pubsubHandler1)
|
||||
var ticket2 = await api2.pubsubSubscribe("test-topic", pubsubHandler2)
|
||||
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
var topics1 = await api1.pubsubGetTopics()
|
||||
var topics2 = await api2.pubsubGetTopics()
|
||||
|
||||
if len(topics1) == 1 and len(topics2) == 1:
|
||||
var peers1 = await api1.pubsubListPeers("test-topic")
|
||||
var peers2 = await api2.pubsubListPeers("test-topic")
|
||||
if len(peers1) == 1 and len(peers2) == 1:
|
||||
# Publish test data via api1.
|
||||
await sleepAsync(250.milliseconds)
|
||||
await api1.pubsubPublish("test-topic", msgData)
|
||||
var res =
|
||||
await one(allFutures(handlerFuture1, handlerFuture2), sleepAsync(5.seconds))
|
||||
|
||||
await api1.close()
|
||||
await api2.close()
|
||||
if resultsCount == 2:
|
||||
result = true
|
||||
|
||||
suite "libp2p-daemon test suite":
|
||||
test "Simple spawn and get identity test":
|
||||
check:
|
||||
waitFor(identitySpawnTest()) == true
|
||||
test "Connect/Accept peer/stream test":
|
||||
check:
|
||||
waitFor(connectStreamTest()) == true
|
||||
asyncTest "GossipSub test":
|
||||
checkUntilTimeoutCustom(10.seconds, 100.milliseconds):
|
||||
(await pubsubTest({PSGossipSub}))
|
||||
asyncTest "FloodSub test":
|
||||
checkUntilTimeoutCustom(10.seconds, 100.milliseconds):
|
||||
(await pubsubTest({PSFloodSub}))
|
||||
@@ -1,58 +0,0 @@
|
||||
{.used.}
|
||||
|
||||
import helpers, commoninterop
|
||||
import ../libp2p
|
||||
import ../libp2p/crypto/crypto, ../libp2p/protocols/connectivity/relay/relay
|
||||
|
||||
proc switchMplexCreator(
|
||||
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
|
||||
prov = proc(config: TransportConfig): Transport =
|
||||
TcpTransport.new({}, config.upgr),
|
||||
relay: Relay = Relay.new(circuitRelayV1 = true),
|
||||
): Switch {.raises: [LPError].} =
|
||||
SwitchBuilder
|
||||
.new()
|
||||
.withSignedPeerRecord(false)
|
||||
.withMaxConnections(MaxConnections)
|
||||
.withRng(crypto.newRng())
|
||||
.withAddresses(@[ma])
|
||||
.withMaxIn(-1)
|
||||
.withMaxOut(-1)
|
||||
.withTransport(prov)
|
||||
.withMplex()
|
||||
.withMaxConnsPerPeer(MaxConnectionsPerPeer)
|
||||
.withPeerStore(capacity = 1000)
|
||||
.withNoise()
|
||||
.withCircuitRelay(relay)
|
||||
.withNameResolver(nil)
|
||||
.build()
|
||||
|
||||
proc switchYamuxCreator(
|
||||
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
|
||||
prov = proc(config: TransportConfig): Transport =
|
||||
TcpTransport.new({}, config.upgr),
|
||||
relay: Relay = Relay.new(circuitRelayV1 = true),
|
||||
): Switch {.raises: [LPError].} =
|
||||
SwitchBuilder
|
||||
.new()
|
||||
.withSignedPeerRecord(false)
|
||||
.withMaxConnections(MaxConnections)
|
||||
.withRng(crypto.newRng())
|
||||
.withAddresses(@[ma])
|
||||
.withMaxIn(-1)
|
||||
.withMaxOut(-1)
|
||||
.withTransport(prov)
|
||||
.withYamux()
|
||||
.withMaxConnsPerPeer(MaxConnectionsPerPeer)
|
||||
.withPeerStore(capacity = 1000)
|
||||
.withNoise()
|
||||
.withCircuitRelay(relay)
|
||||
.withNameResolver(nil)
|
||||
.build()
|
||||
|
||||
suite "Tests interop":
|
||||
commonInteropTests("mplex", switchMplexCreator)
|
||||
relayInteropTests("mplex", switchMplexCreator)
|
||||
|
||||
commonInteropTests("yamux", switchYamuxCreator)
|
||||
relayInteropTests("yamux", switchYamuxCreator)
|
||||
@@ -123,7 +123,7 @@ const
|
||||
"/ip4/127.0.0.1/ipfs", "/ip4/127.0.0.1/ipfs/tcp", "/p2p-circuit/50",
|
||||
]
|
||||
|
||||
PathVectors = ["/unix/tmp/p2pd.sock", "/unix/a/b/c/d/e/f/g/h/i.sock"]
|
||||
PathVectors = ["/unix/a/b/c/d/e/f/g/h/i.sock"]
|
||||
|
||||
PathExpects = [
|
||||
"90030E2F746D702F703270642E736F636B",
|
||||
|
||||
@@ -45,5 +45,5 @@ when defined(libp2p_autotls_support):
|
||||
import
|
||||
mix/[
|
||||
testcrypto, testcurve25519, testtagmanager, testseqnogenerator, testserialization,
|
||||
testmixmessage, testsphinx, testmultiaddr,
|
||||
testmixmessage, testsphinx, testmultiaddr, testfragmentation, testmixnode,
|
||||
]
|
||||
|
||||
@@ -28,7 +28,6 @@ import
|
||||
builders,
|
||||
upgrademngrs/upgrade,
|
||||
varint,
|
||||
daemon/daemonapi,
|
||||
]
|
||||
import ./helpers
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ import
|
||||
builders,
|
||||
protocols/ping,
|
||||
wire,
|
||||
utils/ipaddr,
|
||||
]
|
||||
|
||||
from ./helpers import suite, asyncTest, asyncTeardown, checkTrackers, skip, check
|
||||
|
||||
Reference in New Issue
Block a user