Compare commits

...

8 Commits

Author SHA1 Message Date
Radosław Kamiński
5f6b8e86a5 test(rendezvous): error cases (#1683) 2025-09-22 15:53:47 +01:00
richΛrd
11b98b7a3f fix: add missing import (#1707) 2025-09-22 13:38:32 +00:00
richΛrd
647f76341e feat(mix): mix_protocol and entry connection (#1703) 2025-09-22 13:09:18 +00:00
Radosław Kamiński
fbf96bb2ce chore(readme): Update chat code example link (#1709) 2025-09-22 13:28:03 +01:00
richΛrd
f0aaecb743 feat(mix): mixnode (#1702) 2025-09-21 16:46:48 +00:00
richΛrd
8d3076ea99 fix: add missing import for linux/amd64 daily job (#1706) 2025-09-20 18:39:12 -04:00
richΛrd
70b7d61436 feat(mix): SURBs and fragmentation (#1700) 2025-09-19 15:56:26 -04:00
Gabriel Cruz
37bae0986c chore: remove go daemon (#1705) 2025-09-19 15:21:23 -04:00
40 changed files with 1768 additions and 3179 deletions

View File

@@ -72,15 +72,6 @@ jobs:
shell: ${{ matrix.shell }}
nim_ref: ${{ matrix.nim.ref }}
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: '~1.16.0' # That's the minimum Go version that works with arm.
- name: Install p2pd
run: |
V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3
- name: Restore deps from cache
id: deps-cache
uses: actions/cache@v3

View File

@@ -69,16 +69,6 @@ jobs:
nim_ref: ${{ matrix.nim.ref }}
cpu: ${{ matrix.cpu }}
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: '~1.16.0'
cache: false
- name: Install p2pd
run: |
V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3
- name: Install dependencies (pinned)
if: ${{ inputs.pinned_deps }}
run: |

View File

@@ -47,7 +47,7 @@ nimble install libp2p
You'll find the nim-libp2p documentation [here](https://vacp2p.github.io/nim-libp2p/docs/). See [examples](./examples) for simple usage patterns.
## Getting Started
Try out the chat example. For this you'll need to have [`go-libp2p-daemon`](examples/go-daemon/daemonapi.md) running. Full code can be found [here](https://github.com/status-im/nim-libp2p/blob/master/examples/chat.nim):
Try out the chat example. Full code can be found [here](https://github.com/vacp2p/nim-libp2p/blob/master/examples/directchat.nim):
```bash
nim c -r --threads:on examples/directchat.nim
@@ -81,12 +81,6 @@ Run unit tests:
# run all the unit tests
nimble test
```
**Obs:** Running all tests requires the [`go-libp2p-daemon` to be installed and running](examples/go-daemon/daemonapi.md).
If you only want to run tests that don't require `go-libp2p-daemon`, use:
```
nimble testnative
```
For a list of all available test suites, use:
```
@@ -155,8 +149,6 @@ List of packages modules implemented in nim-libp2p:
| [connmanager](libp2p/connmanager.nim) | Connection manager |
| [identify / push identify](libp2p/protocols/identify.nim) | [Identify](https://docs.libp2p.io/concepts/fundamentals/protocols/#identify) protocol |
| [ping](libp2p/protocols/ping.nim) | [Ping](https://docs.libp2p.io/concepts/fundamentals/protocols/#ping) protocol |
| [libp2p-daemon-client](libp2p/daemon/daemonapi.nim) | [go-daemon](https://github.com/libp2p/go-libp2p-daemon) nim wrapper |
| [interop-libp2p](tests/testinterop.nim) | Interop tests |
| **Transports** | |
| [libp2p-tcp](libp2p/transports/tcptransport.nim) | TCP transport |
| [libp2p-ws](libp2p/transports/wstransport.nim) | WebSocket & WebSocket Secure transport |

View File

@@ -1,54 +0,0 @@
import chronos, nimcrypto, strutils
import ../../libp2p/daemon/daemonapi
import ../hexdump
const PubSubTopic = "test-net"
proc dumpSubscribedPeers(api: DaemonAPI) {.async.} =
var peers = await api.pubsubListPeers(PubSubTopic)
echo "= List of connected and subscribed peers:"
for item in peers:
echo item.pretty()
proc dumpAllPeers(api: DaemonAPI) {.async.} =
var peers = await api.listPeers()
echo "Current connected peers count = ", len(peers)
for item in peers:
echo item.peer.pretty()
proc monitor(api: DaemonAPI) {.async.} =
while true:
echo "Dumping all peers"
await dumpAllPeers(api)
await sleepAsync(5000)
proc main() {.async.} =
echo "= Starting P2P bootnode"
var api = await newDaemonApi({DHTFull, PSGossipSub})
var id = await api.identity()
echo "= P2P bootnode ", id.peer.pretty(), " started."
let mcip4 = multiCodec("ip4")
let mcip6 = multiCodec("ip6")
echo "= You can use one of this addresses to bootstrap your nodes:"
for item in id.addresses:
if item.protoCode() == mcip4 or item.protoCode() == mcip6:
echo $item & "/ipfs/" & id.peer.pretty()
asyncSpawn monitor(api)
proc pubsubLogger(
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
): Future[bool] {.async.} =
let msglen = len(message.data)
echo "= Recieved pubsub message with length ",
msglen, " bytes from peer ", message.peer.pretty()
echo dumpHex(message.data)
await api.dumpSubscribedPeers()
result = true
var ticket = await api.pubsubSubscribe(PubSubTopic, pubsubLogger)
when isMainModule:
waitFor(main())
while true:
poll()

View File

@@ -1,132 +0,0 @@
import chronos, nimcrypto, strutils
import ../../libp2p/daemon/daemonapi
## nim c -r --threads:on chat.nim
when not (compileOption("threads")):
{.fatal: "Please, compile this program with the --threads:on option!".}
const ServerProtocols = @["/test-chat-stream"]
type CustomData = ref object
api: DaemonAPI
remotes: seq[StreamTransport]
consoleFd: AsyncFD
serveFut: Future[void]
proc threadMain(wfd: AsyncFD) {.thread.} =
## This procedure performs reading from `stdin` and sends data over
## pipe to main thread.
var transp = fromPipe(wfd)
while true:
var line = stdin.readLine()
let res = waitFor transp.write(line & "\r\n")
proc serveThread(udata: CustomData) {.async.} =
## This procedure perform reading on pipe and sends data to remote clients.
var transp = fromPipe(udata.consoleFd)
proc remoteReader(transp: StreamTransport) {.async.} =
while true:
var line = await transp.readLine()
if len(line) == 0:
break
echo ">> ", line
while true:
try:
var line = await transp.readLine()
if line.startsWith("/connect"):
var parts = line.split(" ")
if len(parts) == 2:
var peerId = PeerId.init(parts[1])
var address = MultiAddress.init(multiCodec("p2p-circuit"))
address &= MultiAddress.init(multiCodec("p2p"), peerId)
echo "= Searching for peer ", peerId.pretty()
var id = await udata.api.dhtFindPeer(peerId)
echo "= Peer " & parts[1] & " found at addresses:"
for item in id.addresses:
echo $item
echo "= Connecting to peer ", $address
await udata.api.connect(peerId, @[address], 30)
echo "= Opening stream to peer chat ", parts[1]
var stream = await udata.api.openStream(peerId, ServerProtocols)
udata.remotes.add(stream.transp)
echo "= Connected to peer chat ", parts[1]
asyncSpawn remoteReader(stream.transp)
elif line.startsWith("/search"):
var parts = line.split(" ")
if len(parts) == 2:
var peerId = PeerId.init(parts[1])
echo "= Searching for peer ", peerId.pretty()
var id = await udata.api.dhtFindPeer(peerId)
echo "= Peer " & parts[1] & " found at addresses:"
for item in id.addresses:
echo $item
elif line.startsWith("/consearch"):
var parts = line.split(" ")
if len(parts) == 2:
var peerId = PeerId.init(parts[1])
echo "= Searching for peers connected to peer ", parts[1]
var peers = await udata.api.dhtFindPeersConnectedToPeer(peerId)
echo "= Found ", len(peers), " connected to peer ", parts[1]
for item in peers:
var peer = item.peer
var addresses = newSeq[string]()
var relay = false
for a in item.addresses:
addresses.add($a)
if a.protoName() == "/p2p-circuit":
relay = true
break
if relay:
echo peer.pretty(), " * ", " [", addresses.join(", "), "]"
else:
echo peer.pretty(), " [", addresses.join(", "), "]"
elif line.startsWith("/exit"):
break
else:
var msg = line & "\r\n"
echo "<< ", line
var pending = newSeq[Future[int]]()
for item in udata.remotes:
pending.add(item.write(msg))
if len(pending) > 0:
var results = await all(pending)
except CatchableError as err:
echo err.msg
proc main() {.async.} =
var data = new CustomData
data.remotes = newSeq[StreamTransport]()
var (rfd, wfd) = createAsyncPipe()
if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe:
raise newException(ValueError, "Could not initialize pipe!")
data.consoleFd = rfd
data.serveFut = serveThread(data)
var thread: Thread[AsyncFD]
thread.createThread(threadMain, wfd)
echo "= Starting P2P node"
data.api = await newDaemonApi({DHTFull, Bootstrap})
await sleepAsync(3000)
var id = await data.api.identity()
proc streamHandler(api: DaemonAPI, stream: P2PStream) {.async.} =
echo "= Peer ", stream.peer.pretty(), " joined chat"
data.remotes.add(stream.transp)
while true:
var line = await stream.transp.readLine()
if len(line) == 0:
break
echo ">> ", line
await data.api.addHandler(ServerProtocols, streamHandler)
echo "= Your PeerId is ", id.peer.pretty()
await data.serveFut
when isMainModule:
waitFor(main())

View File

@@ -1,43 +0,0 @@
# Table of Contents
- [Introduction](#introduction)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Script](#script)
- [Examples](#examples)
# Introduction
This is a libp2p-backed daemon wrapping the functionalities of go-libp2p for use in Nim. <br>
For more information about the go daemon, check out [this repository](https://github.com/libp2p/go-libp2p-daemon).
> **Required only** for running the tests.
# Prerequisites
Go with version `1.16.0`
> You will *likely* be able to build `go-libp2p-daemon` with different Go versions, but **they haven't been tested**.
# Installation
Run the build script while having the `go` command pointing to the correct Go version.
```sh
./scripts/build_p2pd.sh
```
`build_p2pd.sh` will not rebuild unless needed. If you already have the newest binary and you want to force the rebuild, use:
```sh
./scripts/build_p2pd.sh -f
```
Or:
```sh
./scripts/build_p2pd.sh --force
```
If everything goes correctly, the binary (`p2pd`) should be built and placed in the `$GOPATH/bin` directory.
If you're having issues, head into [our discord](https://discord.com/channels/864066763682218004/1115526869769535629) and ask for assistance.
After successfully building the binary, remember to add it to your path so it can be found. You can do that by running:
```sh
export PATH="$PATH:$HOME/go/bin"
```
> **Tip:** To make this change permanent, add the command above to your `.bashrc` file.
# Examples
Examples can be found in the [examples folder](https://github.com/status-im/nim-libp2p/tree/readme/examples/go-daemon)

View File

@@ -1,46 +0,0 @@
import chronos, nimcrypto, strutils, os
import ../../libp2p/daemon/daemonapi
const PubSubTopic = "test-net"
proc main(bn: string) {.async.} =
echo "= Starting P2P node"
var bootnodes = bn.split(",")
var api = await newDaemonApi(
{DHTFull, PSGossipSub, WaitBootstrap}, bootstrapNodes = bootnodes, peersRequired = 1
)
var id = await api.identity()
echo "= P2P node ", id.peer.pretty(), " started:"
for item in id.addresses:
echo item
proc pubsubLogger(
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
): Future[bool] {.async.} =
let msglen = len(message.data)
echo "= Recieved pubsub message with length ",
msglen, " bytes from peer ", message.peer.pretty(), ": "
var strdata = cast[string](message.data)
echo strdata
result = true
var ticket = await api.pubsubSubscribe(PubSubTopic, pubsubLogger)
# Waiting for gossipsub interval
while true:
var peers = await api.pubsubListPeers(PubSubTopic)
if len(peers) > 0:
break
await sleepAsync(1000)
var data = "HELLO\r\n"
var msgData = cast[seq[byte]](data)
await api.pubsubPublish(PubSubTopic, msgData)
when isMainModule:
if paramCount() != 1:
echo "Please supply bootnodes!"
else:
waitFor(main(paramStr(1)))
while true:
poll()

View File

@@ -49,12 +49,6 @@ proc tutorialToMd(filename: string) =
task testnative, "Runs libp2p native tests":
runTest("testnative")
task testdaemon, "Runs daemon tests":
runTest("testdaemon")
task testinterop, "Runs interop tests":
runTest("testinterop")
task testpubsub, "Runs pubsub tests":
runTest("pubsub/testpubsub", "-d:libp2p_gossipsub_1_4")

File diff suppressed because it is too large Load Diff

View File

@@ -1,156 +0,0 @@
# Nim-Libp2p
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [].}
## This module implements Pool of StreamTransport.
import chronos
const DefaultPoolSize* = 8 ## Default pool size
type
ConnectionFlags = enum
None
Busy
PoolItem = object
transp*: StreamTransport
flags*: set[ConnectionFlags]
PoolState = enum
Connecting
Connected
Closing
Closed
TransportPool* = ref object ## Transports pool object
transports: seq[PoolItem]
busyCount: int
state: PoolState
bufferSize: int
event: AsyncEvent
TransportPoolError* = object of AsyncError
proc waitAll[T](futs: seq[Future[T]]): Future[void] =
## Performs waiting for all Future[T].
var counter = len(futs)
var retFuture = newFuture[void]("connpool.waitAllConnections")
proc cb(udata: pointer) =
dec(counter)
if counter == 0:
retFuture.complete()
for fut in futs:
fut.addCallback(cb)
return retFuture
proc newPool*(
address: TransportAddress,
poolsize: int = DefaultPoolSize,
bufferSize = DefaultStreamBufferSize,
): Future[TransportPool] {.async: (raises: [CancelledError]).} =
## Establish pool of connections to address ``address`` with size
## ``poolsize``.
var pool = new TransportPool
pool.bufferSize = bufferSize
pool.transports = newSeq[PoolItem](poolsize)
var conns = newSeq[Future[StreamTransport]](poolsize)
pool.state = Connecting
for i in 0 ..< poolsize:
conns[i] = connect(address, bufferSize)
# Waiting for all connections to be established.
await waitAll(conns)
# Checking connections and preparing pool.
for i in 0 ..< poolsize:
if conns[i].failed:
raise conns[i].error
else:
let transp = conns[i].read()
let item = PoolItem(transp: transp)
pool.transports[i] = item
# Setup available connections event
pool.event = newAsyncEvent()
pool.state = Connected
result = pool
proc acquire*(
pool: TransportPool
): Future[StreamTransport] {.async: (raises: [CancelledError, TransportPoolError]).} =
## Acquire non-busy connection from pool ``pool``.
var transp: StreamTransport
if pool.state in {Connected}:
while true:
if pool.busyCount < len(pool.transports):
for conn in pool.transports.mitems():
if Busy notin conn.flags:
conn.flags.incl(Busy)
inc(pool.busyCount)
transp = conn.transp
break
else:
await pool.event.wait()
pool.event.clear()
if not isNil(transp):
break
else:
raise newException(TransportPoolError, "Pool is not ready!")
result = transp
proc release*(
pool: TransportPool, transp: StreamTransport
) {.async: (raises: [TransportPoolError]).} =
## Release connection ``transp`` back to pool ``pool``.
if pool.state in {Connected, Closing}:
var found = false
for conn in pool.transports.mitems():
if conn.transp == transp:
conn.flags.excl(Busy)
dec(pool.busyCount)
pool.event.fire()
found = true
break
if not found:
raise newException(TransportPoolError, "Transport not bound to pool!")
else:
raise newException(TransportPoolError, "Pool is not ready!")
proc join*(
pool: TransportPool
) {.async: (raises: [TransportPoolError, CancelledError]).} =
## Waiting for all connection to become available.
if pool.state in {Connected, Closing}:
while true:
if pool.busyCount == 0:
break
else:
await pool.event.wait()
pool.event.clear()
elif pool.state == Connecting:
raise newException(TransportPoolError, "Pool is not ready!")
proc close*(
pool: TransportPool
) {.async: (raises: [TransportPoolError, CancelledError]).} =
## Closes transports pool ``pool`` and release all resources.
if pool.state == Connected:
pool.state = Closing
# Waiting for all transports to become available.
await pool.join()
# Closing all transports
var pending = newSeq[Future[void]](len(pool.transports))
for i in 0 ..< len(pool.transports):
let transp = pool.transports[i].transp
transp.close()
pending[i] = transp.join()
# Waiting for all transports to be closed
await waitAll(pending)
# Mark pool as `Closed`.
pool.state = Closed

View File

@@ -93,6 +93,10 @@ proc parseFullAddress*(ma: MultiAddress): MaResult[(PeerId, MultiAddress)] =
proc parseFullAddress*(ma: string | seq[byte]): MaResult[(PeerId, MultiAddress)] =
parseFullAddress(?MultiAddress.init(ma))
proc toFullAddress*(peerId: PeerId, ma: MultiAddress): MaResult[MultiAddress] =
let peerIdPart = ?MultiAddress.init(multiCodec("p2p"), peerId.data)
concat(ma, peerIdPart)
proc new*(
p: typedesc[PeerInfo],
key: PrivateKey,

View File

@@ -0,0 +1,133 @@
import hashes, chronos, stew/byteutils, results, chronicles
import ../../stream/connection
import ../../varint
import ../../utils/sequninit
import ./mix_protocol
from fragmentation import DataSize
type MixDialer* = proc(
msg: seq[byte], codec: string, destination: MixDestination
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true).}
type MixEntryConnection* = ref object of Connection
destination: MixDestination
codec: string
mixDialer: MixDialer
func shortLog*(conn: MixEntryConnection): string =
if conn == nil:
"MixEntryConnection(nil)"
else:
"MixEntryConnection(" & $conn.destination & ")"
chronicles.formatIt(MixEntryConnection):
shortLog(it)
method readOnce*(
s: MixEntryConnection, pbytes: pointer, nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError]), public.} =
# TODO: implement
raise newLPStreamEOFError()
method readExactly*(
s: MixEntryConnection, pbytes: pointer, nbytes: int
): Future[void] {.async: (raises: [CancelledError, LPStreamError]), public.} =
# TODO: implement
raise newLPStreamEOFError()
method readLine*(
s: MixEntryConnection, limit = 0, sep = "\r\n"
): Future[string] {.async: (raises: [CancelledError, LPStreamError]), public.} =
# TODO: implement
raise newLPStreamEOFError()
method readVarint*(
conn: MixEntryConnection
): Future[uint64] {.async: (raises: [CancelledError, LPStreamError]), public.} =
# TODO: implement
raise newLPStreamEOFError()
method readLp*(
s: MixEntryConnection, maxSize: int
): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]), public.} =
# TODO: implement
raise newLPStreamEOFError()
method write*(
self: MixEntryConnection, msg: seq[byte]
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
self.mixDialer(msg, self.codec, self.destination)
proc write*(
self: MixEntryConnection, msg: string
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
self.write(msg.toBytes())
method writeLp*(
self: MixEntryConnection, msg: openArray[byte]
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
if msg.len() > DataSize:
let fut = newFuture[void]()
fut.fail(
newException(LPStreamError, "exceeds max msg size of " & $DataSize & " bytes")
)
return fut
## Write `msg` with a varint-encoded length prefix
let vbytes = PB.toBytes(msg.len().uint64)
var buf = newSeqUninit[byte](msg.len() + vbytes.len)
buf[0 ..< vbytes.len] = vbytes.toOpenArray()
buf[vbytes.len ..< buf.len] = msg
self.write(buf)
method writeLp*(
self: MixEntryConnection, msg: string
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
self.writeLp(msg.toOpenArrayByte(0, msg.high))
proc shortLog*(self: MixEntryConnection): string {.raises: [].} =
"[MixEntryConnection] Destination: " & $self.destination
method closeImpl*(
self: MixEntryConnection
): Future[void] {.async: (raises: [], raw: true).} =
let fut = newFuture[void]()
fut.complete()
return fut
func hash*(self: MixEntryConnection): Hash =
hash($self.destination)
when defined(libp2p_agents_metrics):
proc setShortAgent*(self: MixEntryConnection, shortAgent: string) =
discard
proc new*(
T: typedesc[MixEntryConnection],
srcMix: MixProtocol,
destination: MixDestination,
codec: string,
): T {.raises: [].} =
var instance = T()
instance.destination = destination
instance.codec = codec
instance.mixDialer = proc(
msg: seq[byte], codec: string, dest: MixDestination
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
await srcMix.anonymizeLocalProtocolSend(
nil, msg, codec, dest, 0 # TODO: set incoming queue for replies and surbs
)
when defined(libp2p_agents_metrics):
instance.shortAgent = connection.shortAgent
instance
proc toConnection*(
srcMix: MixProtocol, destination: MixDestination, codec: string
): Result[Connection, string] {.gcsafe, raises: [].} =
## Create a stream to send and optionally receive responses.
## Under the hood it will wrap the message in a sphinx packet
## and send it via a random mix path.
ok(MixEntryConnection.new(srcMix, destination, codec))

View File

@@ -0,0 +1,36 @@
import chronicles, chronos, metrics
import ../../builders
import ../../stream/connection
import ./mix_metrics
type ExitLayer* = object
switch: Switch
proc init*(T: typedesc[ExitLayer], switch: Switch): T =
ExitLayer(switch: switch)
proc onMessage*(
self: ExitLayer,
codec: string,
message: seq[byte],
destAddr: MultiAddress,
destPeerId: PeerId,
) {.async: (raises: [CancelledError]).} =
# If dialing destination fails, no response is returned to
# the sender, so, flow can just end here. Only log errors
# for now
# https://github.com/vacp2p/mix/issues/86
try:
let destConn = await self.switch.dial(destPeerId, @[destAddr], codec)
defer:
await destConn.close()
await destConn.write(message)
except LPStreamError as exc:
error "Stream error while writing to next hop: ", err = exc.msg
mix_messages_error.inc(labelValues = ["ExitLayer", "LPSTREAM_ERR"])
except DialFailedError as exc:
error "Failed to dial next hop: ", err = exc.msg
mix_messages_error.inc(labelValues = ["ExitLayer", "DIAL_FAILED"])
except CancelledError as exc:
raise exc

View File

@@ -0,0 +1,95 @@
import ./[serialization, seqno_generator]
import results, stew/endians2
import ../../peerid
const PaddingLengthSize* = 2
const SeqNoSize* = 4
const DataSize* = MessageSize - PaddingLengthSize - SeqNoSize
# Unpadding and reassembling messages will be handled by the top-level applications.
# Although padding and splitting messages could also be managed at that level, we
# implement it here to clarify the sender's logic.
# This is crucial as the sender is responsible for wrapping messages in Sphinx packets.
type MessageChunk* = object
paddingLength: uint16
data: seq[byte]
seqNo: uint32
proc init*(
T: typedesc[MessageChunk], paddingLength: uint16, data: seq[byte], seqNo: uint32
): T =
T(paddingLength: paddingLength, data: data, seqNo: seqNo)
proc get*(msgChunk: MessageChunk): (uint16, seq[byte], uint32) =
(msgChunk.paddingLength, msgChunk.data, msgChunk.seqNo)
proc serialize*(msgChunk: MessageChunk): seq[byte] =
let
paddingBytes = msgChunk.paddingLength.toBytesBE()
seqNoBytes = msgChunk.seqNo.toBytesBE()
doAssert msgChunk.data.len == DataSize,
"Padded data must be exactly " & $DataSize & " bytes"
return @paddingBytes & msgChunk.data & @seqNoBytes
proc deserialize*(T: typedesc[MessageChunk], data: openArray[byte]): Result[T, string] =
if data.len != MessageSize:
return err("Data must be exactly " & $MessageSize & " bytes")
let
paddingLength = uint16.fromBytesBE(data[0 .. PaddingLengthSize - 1])
chunk = data[PaddingLengthSize .. (PaddingLengthSize + DataSize - 1)]
seqNo = uint32.fromBytesBE(data[PaddingLengthSize + DataSize ..^ 1])
ok(T(paddingLength: paddingLength, data: chunk, seqNo: seqNo))
proc ceilDiv*(a, b: int): int =
(a + b - 1) div b
proc addPadding*(messageBytes: seq[byte], seqNo: SeqNo): MessageChunk =
## Pads messages smaller than DataSize
let paddingLength = uint16(DataSize - messageBytes.len)
let paddedData =
if paddingLength > 0:
let paddingBytes = newSeq[byte](paddingLength)
paddingBytes & messageBytes
else:
messageBytes
MessageChunk(paddingLength: paddingLength, data: paddedData, seqNo: seqNo)
proc addPadding*(messageBytes: seq[byte], peerId: PeerId): MessageChunk =
## Pads messages smaller than DataSize
var seqNoGen = SeqNo.init(peerId)
seqNoGen.generate(messageBytes)
messageBytes.addPadding(seqNoGen)
proc removePadding*(msgChunk: MessageChunk): Result[seq[byte], string] =
let msgLength = len(msgChunk.data) - int(msgChunk.paddingLength)
if msgLength < 0:
return err("Invalid padding length")
ok(msgChunk.data[msgChunk.paddingLength ..^ 1])
proc padAndChunkMessage*(messageBytes: seq[byte], peerId: PeerId): seq[MessageChunk] =
var seqNoGen = SeqNo.init(peerId)
seqNoGen.generate(messageBytes)
var chunks: seq[MessageChunk] = @[]
# Split to chunks
let totalChunks = max(1, ceilDiv(messageBytes.len, DataSize))
# Ensure at least one chunk is generated
for i in 0 ..< totalChunks:
let
startIdx = i * DataSize
endIdx = min(startIdx + DataSize, messageBytes.len)
chunkData = messageBytes[startIdx .. endIdx - 1]
msgChunk = chunkData.addPadding(seqNoGen)
chunks.add(msgChunk)
seqNoGen.inc()
return chunks

View File

@@ -0,0 +1,13 @@
{.push raises: [].}
import metrics
declarePublicCounter mix_messages_recvd, "number of mix messages received", ["type"]
declarePublicCounter mix_messages_forwarded,
"number of mix messages forwarded", ["type"]
declarePublicCounter mix_messages_error,
"number of mix messages failed processing", ["type", "error"]
declarePublicGauge mix_pool_size, "number of nodes in the pool"

View File

@@ -0,0 +1,318 @@
import os, results, strformat, sugar, sequtils
import std/streams
import ../../crypto/[crypto, curve25519, secp]
import ../../[multiaddress, multicodec, peerid, peerinfo]
import ./[serialization, curve25519, multiaddr]
const MixNodeInfoSize* =
AddrSize + (2 * FieldElementSize) + (SkRawPublicKeySize + SkRawPrivateKeySize)
const MixPubInfoSize* = AddrSize + FieldElementSize + SkRawPublicKeySize
type MixNodeInfo* = object
peerId*: PeerId
multiAddr*: MultiAddress
mixPubKey*: FieldElement
mixPrivKey*: FieldElement
libp2pPubKey*: SkPublicKey
libp2pPrivKey*: SkPrivateKey
proc initMixNodeInfo*(
peerId: PeerId,
multiAddr: MultiAddress,
mixPubKey, mixPrivKey: FieldElement,
libp2pPubKey: SkPublicKey,
libp2pPrivKey: SkPrivateKey,
): MixNodeInfo =
MixNodeInfo(
peerId: peerId,
multiAddr: multiAddr,
mixPubKey: mixPubKey,
mixPrivKey: mixPrivKey,
libp2pPubKey: libp2pPubKey,
libp2pPrivKey: libp2pPrivKey,
)
proc get*(
info: MixNodeInfo
): (PeerId, MultiAddress, FieldElement, FieldElement, SkPublicKey, SkPrivateKey) =
(
info.peerId, info.multiAddr, info.mixPubKey, info.mixPrivKey, info.libp2pPubKey,
info.libp2pPrivKey,
)
proc serialize*(nodeInfo: MixNodeInfo): Result[seq[byte], string] =
let addrBytes = multiAddrToBytes(nodeInfo.peerId, nodeInfo.multiAddr).valueOr:
return err("Error in multiaddress conversion to bytes: " & error)
let
mixPubKeyBytes = fieldElementToBytes(nodeInfo.mixPubKey)
mixPrivKeyBytes = fieldElementToBytes(nodeInfo.mixPrivKey)
libp2pPubKeyBytes = nodeInfo.libp2pPubKey.getBytes()
libp2pPrivKeyBytes = nodeInfo.libp2pPrivKey.getBytes()
return ok(
addrBytes & mixPubKeyBytes & mixPrivKeyBytes & libp2pPubKeyBytes & libp2pPrivKeyBytes
)
proc deserialize*(T: typedesc[MixNodeInfo], data: openArray[byte]): Result[T, string] =
if len(data) != MixNodeInfoSize:
return
err("Serialized Mix node info must be exactly " & $MixNodeInfoSize & " bytes")
let (peerId, multiAddr) = bytesToMultiAddr(data[0 .. AddrSize - 1]).valueOr:
return err("Error in multiaddress conversion to bytes: " & error)
let mixPubKey = bytesToFieldElement(
data[AddrSize .. (AddrSize + FieldElementSize - 1)]
).valueOr:
return err("Mix public key deserialize error: " & error)
let mixPrivKey = bytesToFieldElement(
data[(AddrSize + FieldElementSize) .. (AddrSize + (2 * FieldElementSize) - 1)]
).valueOr:
return err("Mix private key deserialize error: " & error)
let libp2pPubKey = SkPublicKey.init(
data[
AddrSize + (2 * FieldElementSize) ..
AddrSize + (2 * FieldElementSize) + SkRawPublicKeySize - 1
]
).valueOr:
return err("Failed to initialize libp2p public key")
let libp2pPrivKey = SkPrivateKey.init(
data[AddrSize + (2 * FieldElementSize) + SkRawPublicKeySize ..^ 1]
).valueOr:
return err("Failed to initialize libp2p private key")
ok(
T(
peerId: peerId,
multiAddr: multiAddr,
mixPubKey: mixPubKey,
mixPrivKey: mixPrivKey,
libp2pPubKey: libp2pPubKey,
libp2pPrivKey: libp2pPrivKey,
)
)
proc writeToFile*(
node: MixNodeInfo, index: int, nodeInfoFolderPath: string = "./nodeInfo"
): Result[void, string] =
if not dirExists(nodeInfoFolderPath):
createDir(nodeInfoFolderPath)
let filename = nodeInfoFolderPath / fmt"mixNode_{index}"
var file = newFileStream(filename, fmWrite)
if file == nil:
return err("Failed to create file stream for " & filename)
defer:
file.close()
let serializedData = node.serialize().valueOr:
return err("Failed to serialize mix node info: " & error)
file.writeData(addr serializedData[0], serializedData.len)
return ok()
proc readFromFile*(
T: typedesc[MixNodeInfo], index: int, nodeInfoFolderPath: string = "./nodeInfo"
): Result[T, string] =
let filename = nodeInfoFolderPath / fmt"mixNode_{index}"
if not fileExists(filename):
return err("File does not exist")
var file = newFileStream(filename, fmRead)
if file == nil:
return err(
"Failed to open file: " & filename &
". Check permissions or if the path is correct."
)
defer:
file.close()
let data = ?file.readAll().catch().mapErr(x => "File read error: " & x.msg)
if data.len != MixNodeInfoSize:
return err(
"Invalid data size for MixNodeInfo: expected " & $MixNodeInfoSize &
" bytes, but got " & $(data.len) & " bytes."
)
let dMixNodeInfo = MixNodeInfo.deserialize(cast[seq[byte]](data)).valueOr:
return err("Mix node info deserialize error: " & error)
return ok(dMixNodeInfo)
proc deleteNodeInfoFolder*(nodeInfoFolderPath: string = "./nodeInfo") =
## Deletes the folder that stores serialized mix node info files
## along with all its contents, if the folder exists.
if dirExists(nodeInfoFolderPath):
removeDir(nodeInfoFolderPath)
type MixPubInfo* = object
peerId*: PeerId
multiAddr*: MultiAddress
mixPubKey*: FieldElement
libp2pPubKey*: SkPublicKey
proc init*(
T: typedesc[MixPubInfo],
peerId: PeerId,
multiAddr: MultiAddress,
mixPubKey: FieldElement,
libp2pPubKey: SkPublicKey,
): T =
T(
peerId: PeerId,
multiAddr: multiAddr,
mixPubKey: mixPubKey,
libp2pPubKey: libp2pPubKey,
)
proc get*(info: MixPubInfo): (PeerId, MultiAddress, FieldElement, SkPublicKey) =
(info.peerId, info.multiAddr, info.mixPubKey, info.libp2pPubKey)
proc serialize*(nodeInfo: MixPubInfo): Result[seq[byte], string] =
let addrBytes = multiAddrToBytes(nodeInfo.peerId, nodeInfo.multiAddr).valueOr:
return err("Error in multiaddress conversion to bytes: " & error)
let
mixPubKeyBytes = fieldElementToBytes(nodeInfo.mixPubKey)
libp2pPubKeyBytes = nodeInfo.libp2pPubKey.getBytes()
return ok(addrBytes & mixPubKeyBytes & libp2pPubKeyBytes)
proc deserialize*(T: typedesc[MixPubInfo], data: openArray[byte]): Result[T, string] =
if len(data) != MixPubInfoSize:
return
err("Serialized mix public info must be exactly " & $MixPubInfoSize & " bytes")
let (peerId, multiAddr) = bytesToMultiAddr(data[0 .. AddrSize - 1]).valueOr:
return err("Error in bytes to multiaddress conversion: " & error)
let mixPubKey = bytesToFieldElement(
data[AddrSize .. (AddrSize + FieldElementSize - 1)]
).valueOr:
return err("Mix public key deserialize error: " & error)
let libp2pPubKey = SkPublicKey.init(data[(AddrSize + FieldElementSize) ..^ 1]).valueOr:
return err("Failed to initialize libp2p public key: ")
ok(
MixPubInfo(
peerId: peerId,
multiAddr: multiAddr,
mixPubKey: mixPubKey,
libp2pPubKey: libp2pPubKey,
)
)
proc writeToFile*(
node: MixPubInfo, index: int, pubInfoFolderPath: string = "./pubInfo"
): Result[void, string] =
if not dirExists(pubInfoFolderPath):
createDir(pubInfoFolderPath)
let filename = pubInfoFolderPath / fmt"mixNode_{index}"
var file = newFileStream(filename, fmWrite)
if file == nil:
return err("Failed to create file stream for " & filename)
defer:
file.close()
let serializedData = node.serialize().valueOr:
return err("Failed to serialize mix pub info: " & error)
file.writeData(unsafeAddr serializedData[0], serializedData.len)
return ok()
proc readFromFile*(
T: typedesc[MixPubInfo], index: int, pubInfoFolderPath: string = "./pubInfo"
): Result[T, string] =
let filename = pubInfoFolderPath / fmt"mixNode_{index}"
if not fileExists(filename):
return err("File does not exist")
var file = newFileStream(filename, fmRead)
if file == nil:
return err(
"Failed to open file: " & filename &
". Check permissions or if the path is correct."
)
defer:
file.close()
let data = ?file.readAll().catch().mapErr(x => "File read error: " & x.msg)
if data.len != MixPubInfoSize:
return err(
"Invalid data size for MixNodeInfo: expected " & $MixNodeInfoSize &
" bytes, but got " & $(data.len) & " bytes."
)
let dMixPubInfo = MixPubInfo.deserialize(cast[seq[byte]](data)).valueOr:
return err("Mix pub info deserialize error: " & error)
return ok(dMixPubInfo)
proc deletePubInfoFolder*(pubInfoFolderPath: string = "./pubInfo") =
## Deletes the folder containing serialized public mix node info
## and all files inside it, if the folder exists.
if dirExists(pubInfoFolderPath):
removeDir(pubInfoFolderPath)
type MixNodes* = seq[MixNodeInfo]
proc getMixPubInfoByIndex*(self: MixNodes, index: int): Result[MixPubInfo, string] =
if index < 0 or index >= self.len:
return err("Index must be between 0 and " & $(self.len))
ok(
MixPubInfo(
peerId: self[index].peerId,
multiAddr: self[index].multiAddr,
mixPubKey: self[index].mixPubKey,
libp2pPubKey: self[index].libp2pPubKey,
)
)
proc generateMixNodes(
count: int, basePort: int = 4242, rng: ref HmacDrbgContext = newRng()
): Result[MixNodes, string] =
var nodes = newSeq[MixNodeInfo](count)
for i in 0 ..< count:
let keyPairResult = generateKeyPair()
if keyPairResult.isErr:
return err("Generate key pair error: " & $keyPairResult.error)
let (mixPrivKey, mixPubKey) = keyPairResult.get()
let
rng = newRng()
keyPair = SkKeyPair.random(rng[])
pubKeyProto = PublicKey(scheme: Secp256k1, skkey: keyPair.pubkey)
peerId = PeerId.init(pubKeyProto).get()
multiAddr =
?MultiAddress.init(fmt"/ip4/0.0.0.0/tcp/{basePort + i}").tryGet().catch().mapErr(
x => x.msg
)
nodes[i] = MixNodeInfo(
peerId: peerId,
multiAddr: multiAddr,
mixPubKey: mixPubKey,
mixPrivKey: mixPrivKey,
libp2pPubKey: keyPair.pubkey,
libp2pPrivKey: keyPair.seckey,
)
ok(nodes)
proc initializeMixNodes*(count: int, basePort: int = 4242): Result[MixNodes, string] =
## Creates and initializes a set of mix nodes
let mixNodes = generateMixNodes(count, basePort).valueOr:
return err("Mix node initialization error: " & error)
return ok(mixNodes)
proc findByPeerId*(self: MixNodes, peerId: PeerId): Result[MixNodeInfo, string] =
let filteredNodes = self.filterIt(it.peerId == peerId)
if filteredNodes.len != 0:
return ok(filteredNodes[0])
return err("No node with peer id: " & $peerId)
proc initMixMultiAddrByIndex*(
self: var MixNodes, index: int, peerId: PeerId, multiAddr: MultiAddress
): Result[void, string] =
if index < 0 or index >= self.len:
return err("Index must be between 0 and " & $(self.len))
self[index].multiAddr = multiAddr
self[index].peerId = peerId
ok()

View File

@@ -0,0 +1,392 @@
import chronicles, chronos, sequtils, strutils, os, results
import std/[strformat, tables], metrics
import
./[
curve25519, fragmentation, mix_message, mix_node, sphinx, serialization,
tag_manager, mix_metrics, exit_layer, multiaddr,
]
import stew/endians2
import ../protocol
import ../../stream/[connection, lpstream]
import ../../[switch, multicodec, peerinfo]
const MixProtocolID* = "/mix/1.0.0"
## Mix Protocol defines a decentralized anonymous message routing layer for libp2p networks.
## It enables sender anonymity by routing each message through a decentralized mix overlay
## network composed of participating libp2p nodes, known as mix nodes. Each message is
## routed independently in a stateless manner, allowing other libp2p protocols to selectively
## anonymize messages without modifying their core protocol behavior.
type MixProtocol* = ref object of LPProtocol
mixNodeInfo: MixNodeInfo
pubNodeInfo: Table[PeerId, MixPubInfo]
switch: Switch
tagManager: TagManager
exitLayer: ExitLayer
rng: ref HmacDrbgContext
proc loadAllButIndexMixPubInfo*(
index, numNodes: int, pubInfoFolderPath: string = "./pubInfo"
): Result[Table[PeerId, MixPubInfo], string] =
var pubInfoTable = initTable[PeerId, MixPubInfo]()
for i in 0 ..< numNodes:
if i == index:
continue
let pubInfo = MixPubInfo.readFromFile(i, pubInfoFolderPath).valueOr:
return err("Failed to load pub info from file: " & error)
pubInfoTable[pubInfo.peerId] = pubInfo
return ok(pubInfoTable)
proc cryptoRandomInt(rng: ref HmacDrbgContext, max: int): Result[int, string] =
if max == 0:
return err("Max cannot be zero.")
let res = rng[].generate(uint64) mod uint64(max)
ok(res.int)
proc handleMixNodeConnection(
mixProto: MixProtocol, conn: Connection
) {.async: (raises: [LPStreamError, CancelledError]).} =
let receivedBytes =
try:
await conn.readLp(PacketSize)
except CancelledError as exc:
raise exc
finally:
await conn.close()
if receivedBytes.len == 0:
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "NO_DATA"])
return # No data, end of stream
# Process the packet
let (peerId, multiAddr, _, mixPrivKey, _, _) = mixProto.mixNodeInfo.get()
let sphinxPacket = SphinxPacket.deserialize(receivedBytes).valueOr:
error "Sphinx packet deserialization error", err = error
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "INVALID_SPHINX"])
return
let processedSP = processSphinxPacket(sphinxPacket, mixPrivKey, mixProto.tagManager).valueOr:
error "Failed to process Sphinx packet", err = error
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "INVALID_SPHINX"])
return
case processedSP.status
of Exit:
mix_messages_recvd.inc(labelValues = ["Exit"])
# This is the exit node, forward to destination
let msgChunk = MessageChunk.deserialize(processedSP.messageChunk).valueOr:
error "Deserialization failed", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_SPHINX"])
return
let unpaddedMsg = msgChunk.removePadding().valueOr:
error "Unpadding message failed", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_SPHINX"])
return
let deserialized = MixMessage.deserialize(unpaddedMsg).valueOr:
error "Deserialization failed", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_SPHINX"])
return
if processedSP.destination == Hop():
error "no destination available"
mix_messages_error.inc(labelValues = ["Exit", "NO_DESTINATION"])
return
let destBytes = processedSP.destination.get()
let (destPeerId, destAddr) = bytesToMultiAddr(destBytes).valueOr:
error "Failed to convert bytes to multiaddress", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
return
trace "Exit node - Received mix message",
peerId,
message = deserialized.message,
codec = deserialized.codec,
to = destPeerId
await mixProto.exitLayer.onMessage(
deserialized.codec, deserialized.message, destAddr, destPeerId
)
mix_messages_forwarded.inc(labelValues = ["Exit"])
of Reply:
# TODO: implement
discard
of Intermediate:
trace "# Intermediate: ", peerId, multiAddr
# Add delay
mix_messages_recvd.inc(labelValues = ["Intermediate"])
await sleepAsync(milliseconds(processedSP.delayMs))
# Forward to next hop
let nextHopBytes = processedSP.nextHop.get()
let (nextPeerId, nextAddr) = bytesToMultiAddr(nextHopBytes).valueOr:
error "Failed to convert bytes to multiaddress", err = error
mix_messages_error.inc(labelValues = ["Intermediate", "INVALID_DEST"])
return
try:
let nextHopConn =
await mixProto.switch.dial(nextPeerId, @[nextAddr], MixProtocolID)
defer:
await nextHopConn.close()
await nextHopConn.writeLp(processedSP.serializedSphinxPacket)
mix_messages_forwarded.inc(labelValues = ["Intermediate"])
except CancelledError as exc:
raise exc
except DialFailedError as exc:
error "Failed to dial next hop: ", err = exc.msg
mix_messages_error.inc(labelValues = ["Intermediate", "DIAL_FAILED"])
except LPStreamError as exc:
error "Failed to write to next hop: ", err = exc.msg
mix_messages_error.inc(labelValues = ["Intermediate", "DIAL_FAILED"])
of Duplicate:
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "DUPLICATE"])
of InvalidMAC:
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "INVALID_MAC"])
proc getMaxMessageSizeForCodec*(
codec: string, numberOfSurbs: uint8 = 0
): Result[int, string] =
## Computes the maximum payload size (in bytes) available for a message when encoded
## with the given `codec`
## Returns an error if the codec length would cause it to exceeds the data capacity.
let serializedMsg = MixMessage.init(@[], codec).serialize()
if serializedMsg.len > DataSize:
return err("cannot encode messages for this codec")
return ok(DataSize - serializedMsg.len)
proc sendPacket(
mixProto: MixProtocol,
multiAddrs: MultiAddress,
sphinxPacket: seq[byte],
label: string,
) {.async: (raises: [CancelledError]).} =
## Send the wrapped message to the first mix node in the selected path
let (firstMixPeerId, firstMixAddr) = parseFullAddress(multiAddrs).valueOr:
error "Invalid multiaddress", err = error
mix_messages_error.inc(labelValues = [label, "NON_RECOVERABLE"])
return
try:
let nextHopConn =
await mixProto.switch.dial(firstMixPeerId, @[firstMixAddr], @[MixProtocolID])
defer:
await nextHopConn.close()
await nextHopConn.writeLp(sphinxPacket)
except DialFailedError as exc:
error "Failed to dial next hop: ",
peerId = firstMixPeerId, address = firstMixAddr, err = exc.msg
mix_messages_error.inc(labelValues = [label, "SEND_FAILED"])
except LPStreamError as exc:
error "Failed to write to next hop: ",
peerId = firstMixPeerId, address = firstMixAddr, err = exc.msg
mix_messages_error.inc(labelValues = [label, "SEND_FAILED"])
except CancelledError as exc:
raise exc
mix_messages_forwarded.inc(labelValues = ["Entry"])
proc buildMessage(
msg: seq[byte], codec: string, peerId: PeerId
): Result[Message, (string, string)] =
let
mixMsg = MixMessage.init(msg, codec)
serialized = mixMsg.serialize()
if serialized.len > DataSize:
return err(("message size exceeds maximum payload size", "INVALID_SIZE"))
let
paddedMsg = addPadding(serialized, peerId)
serializedMsgChunk = paddedMsg.serialize()
ok(serializedMsgChunk)
## Represents the final target of a mixnet message.
## contains the peer id and multiaddress of the destination node.
type MixDestination* = object
peerId: PeerId
address: MultiAddress
proc init*(T: typedesc[MixDestination], peerId: PeerId, address: MultiAddress): T =
## Initializes a destination object with the given peer id and multiaddress.
T(peerId: peerId, address: address)
proc `$`*(d: MixDestination): string =
$d.address & "/p2p/" & $d.peerId
proc anonymizeLocalProtocolSend*(
mixProto: MixProtocol,
incoming: AsyncQueue[seq[byte]],
msg: seq[byte],
codec: string,
destination: MixDestination,
numSurbs: uint8,
) {.async: (raises: [CancelledError, LPStreamError]).} =
mix_messages_recvd.inc(labelValues = ["Entry"])
var
multiAddrs: seq[MultiAddress] = @[]
publicKeys: seq[FieldElement] = @[]
hop: seq[Hop] = @[]
delay: seq[seq[byte]] = @[]
exitPeerId: PeerId
# Select L mix nodes at random
let numMixNodes = mixProto.pubNodeInfo.len
var numAvailableNodes = numMixNodes
debug "Destination data", destination
if mixProto.pubNodeInfo.hasKey(destination.peerId):
numAvailableNodes = numMixNodes - 1
if numAvailableNodes < PathLength:
error "No. of public mix nodes less than path length.",
numMixNodes = numAvailableNodes, pathLength = PathLength
mix_messages_error.inc(labelValues = ["Entry", "LOW_MIX_POOL"])
return
# Skip the destination peer
var pubNodeInfoKeys =
mixProto.pubNodeInfo.keys.toSeq().filterIt(it != destination.peerId)
var availableIndices = toSeq(0 ..< pubNodeInfoKeys.len)
var i = 0
while i < PathLength:
let randomIndexPosition = cryptoRandomInt(mixProto.rng, availableIndices.len).valueOr:
error "Failed to generate random number", err = error
mix_messages_error.inc(labelValues = ["Entry", "NON_RECOVERABLE"])
return
let selectedIndex = availableIndices[randomIndexPosition]
let randPeerId = pubNodeInfoKeys[selectedIndex]
availableIndices.del(randomIndexPosition)
# Last hop will be the exit node that will forward the request
if i == PathLength - 1:
exitPeerId = randPeerId
debug "Selected mix node: ", indexInPath = i, peerId = randPeerId
# Extract multiaddress, mix public key, and hop
let (peerId, multiAddr, mixPubKey, _) =
mixProto.pubNodeInfo.getOrDefault(randPeerId).get()
multiAddrs.add(multiAddr)
publicKeys.add(mixPubKey)
let multiAddrBytes = multiAddrToBytes(peerId, multiAddr).valueOr:
error "Failed to convert multiaddress to bytes", err = error
mix_messages_error.inc(labelValues = ["Entry", "INVALID_MIX_INFO"])
#TODO: should we skip and pick a different node here??
return
hop.add(Hop.init(multiAddrBytes))
# Compute delay
let delayMillisec =
if i != PathLength - 1:
cryptoRandomInt(mixProto.rng, 3).valueOr:
error "Failed to generate random number", err = error
mix_messages_error.inc(labelValues = ["Entry", "NON_RECOVERABLE"])
return
else:
0 # Last hop does not require a delay
delay.add(@(delayMillisec.uint16.toBytesBE()))
i = i + 1
#Encode destination
let destAddrBytes = multiAddrToBytes(destination.peerId, destination.address).valueOr:
error "Failed to convert multiaddress to bytes", err = error
mix_messages_error.inc(labelValues = ["Entry", "INVALID_DEST"])
return
let destHop = Hop.init(destAddrBytes)
let message = buildMessage(msg, codec, mixProto.mixNodeInfo.peerId).valueOr:
error "Error building message", err = error[0]
mix_messages_error.inc(labelValues = ["Entry", error[1]])
return
# Wrap in Sphinx packet
let sphinxPacket = wrapInSphinxPacket(message, publicKeys, delay, hop, destHop).valueOr:
error "Failed to wrap in sphinx packet", err = error
mix_messages_error.inc(labelValues = ["Entry", "NON_RECOVERABLE"])
return
# Send the wrapped message to the first mix node in the selected path
await mixProto.sendPacket(multiAddrs[0], sphinxPacket, "Entry")
proc init*(
mixProto: MixProtocol,
mixNodeInfo: MixNodeInfo,
pubNodeInfo: Table[PeerId, MixPubInfo],
switch: Switch,
tagManager: TagManager = TagManager.new(),
rng: ref HmacDrbgContext = newRng(),
) =
mixProto.mixNodeInfo = mixNodeInfo
mixProto.pubNodeInfo = pubNodeInfo
mixProto.switch = switch
mixProto.tagManager = tagManager
mixProto.exitLayer = ExitLayer.init(switch)
mixProto.codecs = @[MixProtocolID]
mixProto.rng = rng
mixProto.handler = proc(
conn: Connection, proto: string
) {.async: (raises: [CancelledError]).} =
try:
await mixProto.handleMixNodeConnection(conn)
except LPStreamError as e:
debug "Stream error", conn = conn, err = e.msg
proc new*(
T: typedesc[MixProtocol],
mixNodeInfo: MixNodeInfo,
pubNodeInfo: Table[PeerId, MixPubInfo],
switch: Switch,
tagManager: TagManager = TagManager.new(),
rng: ref HmacDrbgContext = newRng(),
): T =
let mixProto = new(T)
mixProto.init(mixNodeInfo, pubNodeInfo, switch)
mixProto
proc new*(
T: typedesc[MixProtocol],
index, numNodes: int,
switch: Switch,
nodeFolderInfoPath: string = ".",
rng: ref HmacDrbgContext = newRng(),
): Result[T, string] =
## Constructs a new `MixProtocol` instance for the mix node at `index`,
## loading its private info from `nodeInfo` and the public info of all other nodes from `pubInfo`.
let mixNodeInfo = MixNodeInfo.readFromFile(index, nodeFolderInfoPath / fmt"nodeInfo").valueOr:
return err("Failed to load mix node info for index " & $index & " - err: " & error)
let pubNodeInfo = loadAllButIndexMixPubInfo(
index, numNodes, nodeFolderInfoPath / fmt"pubInfo"
).valueOr:
return err("Failed to load mix pub info for index " & $index & " - err: " & error)
let mixProto =
MixProtocol.new(mixNodeInfo, pubNodeInfo, switch, TagManager.new(), rng)
return ok(mixProto)
proc setNodePool*(
mixProtocol: MixProtocol, mixNodeTable: Table[PeerId, MixPubInfo]
) {.gcsafe, raises: [].} =
mixProtocol.pubNodeInfo = mixNodeTable
proc getNodePoolSize*(mixProtocol: MixProtocol): int {.gcsafe, raises: [].} =
mixProtocol.pubNodeInfo.len

View File

@@ -1,17 +1,17 @@
import results, sugar, sequtils, strutils
import ./serialization
import stew/[base58, endians2]
import stew/endians2
import ../../[multicodec, multiaddress, peerid]
const
PeerIdByteLen = 39 # ed25519 and secp256k1 multihash length
MinMultiAddrComponentLen = 3
MaxMultiAddrComponentLen = 6 # quic + circuit relay
MinMultiAddrComponentLen = 2
MaxMultiAddrComponentLen = 5 # quic + circuit relay
# TODO: Add support for ipv6, dns, dns4, ws/wss/sni support
proc multiAddrToBytes*(
multiAddr: MultiAddress
peerId: PeerId, multiAddr: MultiAddress
): Result[seq[byte], string] {.raises: [].} =
var ma = multiAddr
let sma = multiAddr.items().toSeq()
@@ -22,7 +22,7 @@ proc multiAddrToBytes*(
# Only IPV4 is supported
let isCircuitRelay = ?ma.contains(multiCodec("p2p-circuit"))
let baseP2PEndIdx = if isCircuitRelay: 4 else: 2
let baseP2PEndIdx = if isCircuitRelay: 3 else: 1
let baseAddr =
try:
if sma.len - 1 - baseP2PEndIdx < 0:
@@ -53,27 +53,22 @@ proc multiAddrToBytes*(
let portNum = ?catch(port.split('/')[2].parseInt()).mapErr(x => x.msg)
res.add(portNum.uint16.toBytesBE())
# PeerID (39 bytes), if using circuit relay, this represents the relay server
let p2pPart = ?ma.getPart(multiCodec("p2p"))
let peerId = ?PeerId.init(?p2pPart.protoArgument()).mapErr(x => $x)
if peerId.data.len != PeerIdByteLen:
return err("unsupported PeerId key type")
res.add(peerId.data)
if isCircuitRelay:
let dstPart = ?sma[^1]
let dstPeerId = ?PeerId.init(?dstPart.protoArgument()).mapErr(x => $x)
if dstPeerId.data.len != PeerIdByteLen:
let relayIdPart = ?ma.getPart(multiCodec("p2p"))
let relayId = ?PeerId.init(?relayIdPart.protoArgument()).mapErr(x => $x)
if relayId.data.len != PeerIdByteLen:
return err("unsupported PeerId key type")
res.add(dstPeerId.data)
res.add(relayId.data)
# PeerID (39 bytes)
res.add(peerId.data)
if res.len > AddrSize:
return err("Address must be <= " & $AddrSize & " bytes")
return ok(res & newSeq[byte](AddrSize - res.len))
proc bytesToMultiAddr*(bytes: openArray[byte]): Result[MultiAddress, string] =
proc bytesToMultiAddr*(bytes: openArray[byte]): MaResult[(PeerId, MultiAddress)] =
if bytes.len != AddrSize:
return err("Address must be exactly " & $AddrSize & " bytes")
@@ -83,15 +78,18 @@ proc bytesToMultiAddr*(bytes: openArray[byte]): Result[MultiAddress, string] =
quic = if bytes[4] == 1: "/quic-v1" else: ""
port = uint16.fromBytesBE(bytes[5 .. 6])
# peerId1 represents the circuit relay server addr if p2p-circuit addr, otherwise it's the node's actual peerId
peerId1 = "/p2p/" & Base58.encode(bytes[7 ..< 46])
peerId1Bytes = bytes[7 ..< 46]
peerId2Bytes = bytes[7 + PeerIdByteLen ..< 7 + (PeerIdByteLen * 2)]
# peerId2 will contain a value only if this is a p2p-circuit address
peerId2 =
if peerId2Bytes != newSeq[byte](PeerIdByteLen):
"/p2p-circuit/p2p/" & Base58.encode(peerId2Bytes)
else:
""
return MultiAddress
.init("/ip4/" & ip & "/" & protocol & "/" & $port & quic & peerId1 & peerId2)
.mapErr(x => $x)
let ma = ?MultiAddress.init("/ip4/" & ip & "/" & protocol & "/" & $port & quic)
return
if peerId2Bytes != newSeq[byte](PeerIdByteLen):
# Has circuit relay address
let relayIdMa = ?MultiAddress.init(multiCodec("p2p"), peerId1Bytes)
let p2pCircuitMa = ?MultiAddress.init(multiCodec("p2p-circuit"))
let peerId = ?PeerId.init(peerId2Bytes).mapErr(x => $x)
ok((peerId, ?(ma & relayIdMa & p2pCircuitMa).catch().mapErr(x => x.msg)))
else:
let peerId = ?PeerId.init(peerId1Bytes).mapErr(x => $x)
ok((peerId, ma))

View File

@@ -6,7 +6,6 @@ const
k* = 16 # Security parameter
r* = 5 # Maximum path length
t* = 6 # t.k - combined length of next hop address and delay
L* = 3 # Path length
AlphaSize* = 32 # Group element
BetaSize* = ((r * (t + 1)) + 1) * k # bytes
GammaSize* = 16 # Output of HMAC-SHA-256, truncated to 16 bytes
@@ -15,7 +14,7 @@ const
AddrSize* = (t * k) - DelaySize # Address size
PacketSize* = 4608 # Total packet size (from spec)
MessageSize* = PacketSize - HeaderSize - k # Size of the message itself
payloadSize* = MessageSize + k # Total payload size
PayloadSize* = MessageSize + k # Total payload size
SurbSize* = HeaderSize + k + AddrSize
# Size of a surb packet inside the message payload
SurbLenSize* = 1 # Size of the field storing the number of surbs
@@ -70,8 +69,8 @@ proc serialize*(message: Message): seq[byte] =
proc deserialize*(
T: typedesc[Message], serializedMessage: openArray[byte]
): Result[T, string] =
if len(serializedMessage) != payloadSize:
return err("Serialized message must be exactly " & $payloadSize & " bytes")
if len(serializedMessage) != PayloadSize:
return err("Serialized message must be exactly " & $PayloadSize & " bytes")
return ok(serializedMessage[k ..^ 1])
type Hop* = object
@@ -139,6 +138,23 @@ proc serialize*(info: RoutingInfo): seq[byte] =
return addrBytes & info.Delay & info.Gamma & info.Beta
proc readBytes(
data: openArray[byte], offset: var int, readSize: Opt[int] = Opt.none(int)
): Result[seq[byte], string] =
if data.len < offset:
return err("not enough data")
readSize.withValue(size):
if data.len < offset + size:
return err("not enough data")
let slice = data[offset ..< offset + size]
offset += size
return ok(slice)
let slice = data[offset .. ^1]
offset = data.len
return ok(slice)
proc deserialize*(T: typedesc[RoutingInfo], data: openArray[byte]): Result[T, string] =
if len(data) != BetaSize + ((t + 1) * k):
return err("Data must be exactly " & $(BetaSize + ((t + 1) * k)) & " bytes")
@@ -146,13 +162,13 @@ proc deserialize*(T: typedesc[RoutingInfo], data: openArray[byte]): Result[T, st
let hop = Hop.deserialize(data[0 .. AddrSize - 1]).valueOr:
return err("Deserialize hop error: " & error)
var offset: int = AddrSize
return ok(
RoutingInfo(
Addr: hop,
Delay: data[AddrSize .. (AddrSize + DelaySize - 1)],
Gamma: data[(AddrSize + DelaySize) .. (AddrSize + DelaySize + GammaSize - 1)],
Beta:
data[(AddrSize + DelaySize + GammaSize) .. (((r * (t + 1)) + t + 2) * k) - 1],
Delay: ?data.readBytes(offset, Opt.some(DelaySize)),
Gamma: ?data.readBytes(offset, Opt.some(GammaSize)),
Beta: ?data.readBytes(offset, Opt.some(BetaSize)),
)
)
@@ -183,7 +199,7 @@ type
Key* = seq[byte]
I* = array[SurbIdLen, byte]
SURBIdentifier* = array[SurbIdLen, byte]
SURB* = object
hop*: Hop
@@ -201,23 +217,6 @@ proc serializeMessageWithSURBs*(
surbs.mapIt(it.hop.serialize() & it.header.serialize() & it.key).concat()
ok(byte(surbs.len) & surbBytes & msg)
proc readBytes(
data: seq[byte], offset: var int, readSize: Opt[int] = Opt.none(int)
): Result[seq[byte], string] =
if data.len < offset:
return err("not enough data")
readSize.withValue(size):
if data.len < offset + size:
return err("not enough data")
let slice = data[offset ..< offset + size]
offset += size
return ok(slice)
let slice = data[offset .. ^1]
offset = data.len
return ok(slice)
proc extractSURBs*(msg: seq[byte]): Result[(seq[SURB], seq[byte]), string] =
var offset = 0
let surbsLenBytes = ?readBytes(msg, offset, Opt.some(1))

View File

@@ -1,11 +1,17 @@
import results, sequtils, stew/endians2
import ./[crypto, curve25519, serialization, tag_manager]
import ../../crypto/crypto
import ../../utils/sequninit
const PathLength* = 3 # Path length (L)
const PaddingLength = (((t + 1) * (r - PathLength)) + 1) * k
type ProcessingStatus* = enum
Exit # Packet processed successfully at exit
Intermediate # Packet processed successfully at intermediate node
Duplicate # Packet was discarded due to duplicate tag
InvalidMAC # Packet was discarded due to MAC verification failure
Exit
Intermediate
Reply
Duplicate
InvalidMAC
proc computeAlpha(
publicKeys: openArray[FieldElement]
@@ -79,16 +85,12 @@ proc computeFillerStrings(s: seq[seq[byte]]): Result[seq[byte], string] =
return ok(filler)
const paddingLength = (((t + 1) * (r - L)) + 1) * k
# Function to compute:
proc computeBetaGamma(
s: seq[seq[byte]],
hop: openArray[Hop],
delay: openArray[seq[byte]],
destHop: Hop,
id: I,
id: SURBIdentifier,
): Result[tuple[beta: seq[byte], gamma: seq[byte]], string] =
## Calculates the following elements:
## - Beta: The nested encrypted routing information. It encodes the next hop address, the forwarding delay, integrity check Gamma for the next hop, and the Beta for subsequent hops.
@@ -112,7 +114,7 @@ proc computeBetaGamma(
# Compute Beta and Gamma
if i == sLen - 1:
let destBytes = destHop.serialize()
let destPadding = destBytes & delay[i] & @id & newSeq[byte](paddingLength)
let destPadding = destBytes & delay[i] & @id & newSeq[byte](PaddingLength)
let aes = aes_ctr(beta_aes_key, beta_iv, destPadding)
@@ -149,6 +151,70 @@ proc computeDelta(s: seq[seq[byte]], msg: Message): Result[seq[byte], string] =
return ok(delta)
proc createSURB*(
publicKeys: openArray[FieldElement],
delay: openArray[seq[byte]],
hops: openArray[Hop],
id: SURBIdentifier,
rng: ref HmacDrbgContext = newRng(),
): Result[SURB, string] =
if id == default(SURBIdentifier):
return err("id should be initialized")
# Compute alpha and shared secrets
let (alpha_0, s) = computeAlpha(publicKeys).valueOr:
return err("Error in alpha generation: " & error)
# Compute beta and gamma
let (beta_0, gamma_0) = computeBetaGamma(s, hops, delay, Hop(), id).valueOr:
return err("Error in beta and gamma generation: " & error)
# Generate key
var key = newSeqUninit[byte](k)
rng[].generate(key)
return ok(
SURB(
hop: hops[0],
header: Header.init(alpha_0, beta_0, gamma_0),
secret: Opt.some(s),
key: key,
)
)
proc useSURB*(surb: SURB, msg: Message): SphinxPacket =
# Derive AES key and IV
let
delta_aes_key = deriveKeyMaterial("delta_aes_key", surb.key).kdf()
delta_iv = deriveKeyMaterial("delta_iv", surb.key).kdf()
# Compute Delta
let serializedMsg = msg.serialize()
let delta = aes_ctr(delta_aes_key, delta_iv, serializedMsg)
return SphinxPacket.init(surb.header, delta)
proc processReply*(
key: seq[byte], s: seq[seq[byte]], delta_prime: seq[byte]
): Result[seq[byte], string] =
var delta = delta_prime[0 ..^ 1]
var key_prime = key
for i in 0 .. s.len:
if i != 0:
key_prime = s[i - 1]
let
delta_aes_key = deriveKeyMaterial("delta_aes_key", key_prime).kdf()
delta_iv = deriveKeyMaterial("delta_iv", key_prime).kdf()
delta = aes_ctr(delta_aes_key, delta_iv, delta)
let deserializeMsg = Message.deserialize(delta).valueOr:
return err("Message deserialization error: " & error)
return ok(deserializeMsg)
proc wrapInSphinxPacket*(
msg: Message,
publicKeys: openArray[FieldElement],
@@ -161,7 +227,9 @@ proc wrapInSphinxPacket*(
return err("Error in alpha generation: " & error)
# Compute beta and gamma
let (beta_0, gamma_0) = computeBetaGamma(s, hop, delay, destHop, default(I)).valueOr:
let (beta_0, gamma_0) = computeBetaGamma(
s, hop, delay, destHop, default(SURBIdentifier)
).valueOr:
return err("Error in beta and gamma generation: " & error)
# Compute delta
@@ -184,9 +252,27 @@ type ProcessedSphinxPacket* = object
nextHop*: Hop
delayMs*: int
serializedSphinxPacket*: seq[byte]
of ProcessingStatus.Reply:
id*: SURBIdentifier
delta_prime*: seq[byte]
else:
discard
proc isZeros(data: seq[byte], startIdx: int, endIdx: int): bool =
doAssert 0 <= startIdx and endIdx < data.len and startIdx <= endIdx
for i in startIdx .. endIdx:
if data[i] != 0:
return false
return true
template extractSurbId(data: seq[byte]): SURBIdentifier =
const startIndex = t * k
const endIndex = startIndex + SurbIdLen - 1
doAssert data.len > startIndex and endIndex < data.len
var id: SURBIdentifier
copyMem(addr id[0], addr data[startIndex], SurbIdLen)
id
proc processSphinxPacket*(
sphinxPacket: SphinxPacket, privateKey: FieldElement, tm: var TagManager
): Result[ProcessedSphinxPacket, string] =
@@ -228,19 +314,16 @@ proc processSphinxPacket*(
let delta_prime = aes_ctr(delta_aes_key, delta_iv, payload)
# Compute B
var zeroPadding = newSeq[byte]((t + 1) * k)
let zeroPadding = newSeq[byte]((t + 1) * k)
let B = aes_ctr(beta_aes_key, beta_iv, beta & zeroPadding)
# Check if B has the required prefix for the original message
zeroPadding = newSeq[byte](paddingLength)
if B[((t + 1) * k) .. ((t + 1) * k) + paddingLength - 1] == zeroPadding:
if B.isZeros((t + 1) * k, ((t + 1) * k) + PaddingLength - 1):
let hop = Hop.deserialize(B[0 .. AddrSize - 1]).valueOr:
return err(error)
if B[AddrSize .. ((t + 1) * k) - 1] == newSeq[byte](k + 2):
if delta_prime[0 .. (k - 1)] == newSeq[byte](k):
if B.isZeros(AddrSize, ((t + 1) * k) - 1):
if delta_prime.isZeros(0, k - 1):
let msg = Message.deserialize(delta_prime).valueOr:
return err("Message deserialization error: " & error)
return ok(
@@ -250,9 +333,12 @@ proc processSphinxPacket*(
)
else:
return err("delta_prime should be all zeros")
elif B[0 .. (t * k) - 1] == newSeq[byte](t * k):
# TODO: handle REPLY case
discard
elif B.isZeros(0, (t * k) - 1):
return ok(
ProcessedSphinxPacket(
status: Reply, id: B.extractSurbId(), delta_prime: delta_prime
)
)
else:
# Extract routing information from B
let routingInfo = RoutingInfo.deserialize(B).valueOr:

View File

@@ -150,57 +150,6 @@ proc createStreamServer*[T](
except CatchableError as exc:
raise newException(LPError, "failed simpler createStreamServer: " & exc.msg, exc)
proc createAsyncSocket*(ma: MultiAddress): AsyncFD {.raises: [ValueError, LPError].} =
## Create new asynchronous socket using MultiAddress' ``ma`` socket type and
## protocol information.
##
## Returns ``asyncInvalidSocket`` on error.
##
## Note: This procedure only used in `go-libp2p-daemon` wrapper.
##
var
socktype: SockType = SockType.SOCK_STREAM
protocol: Protocol = Protocol.IPPROTO_TCP
let address = initTAddress(ma).tryGet()
if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
if ma[1].tryGet().protoCode().tryGet() == multiCodec("udp"):
socktype = SockType.SOCK_DGRAM
protocol = Protocol.IPPROTO_UDP
elif ma[1].tryGet().protoCode().tryGet() == multiCodec("tcp"):
socktype = SockType.SOCK_STREAM
protocol = Protocol.IPPROTO_TCP
elif address.family in {AddressFamily.Unix}:
socktype = SockType.SOCK_STREAM
protocol = cast[Protocol](0)
else:
return asyncInvalidSocket
try:
createAsyncSocket(address.getDomain(), socktype, protocol)
except CatchableError as exc:
raise newException(
LPError, "Convert exception to LPError in createAsyncSocket: " & exc.msg, exc
)
proc bindAsyncSocket*(sock: AsyncFD, ma: MultiAddress): bool {.raises: [LPError].} =
## Bind socket ``sock`` to MultiAddress ``ma``.
##
## Note: This procedure only used in `go-libp2p-daemon` wrapper.
##
var
saddr: Sockaddr_storage
slen: SockLen
let address = initTAddress(ma).tryGet()
toSAddr(address, saddr, slen)
if bindSocket(SocketHandle(sock), cast[ptr SockAddr](addr saddr), slen) == 0:
result = true
else:
result = false
proc getLocalAddress*(sock: AsyncFD): TransportAddress =
## Retrieve local socket ``sock`` address.
##

View File

@@ -1,123 +0,0 @@
#!/bin/bash
# Copyright (c) 2018-2020 Status Research & Development GmbH. Licensed under
# either of:
# - Apache License, version 2.0
# - MIT license
# at your option. This file may not be copied, modified, or distributed except
# according to those terms.
set -e
force=false
verbose=false
CACHE_DIR=""
LIBP2P_COMMIT="124530a3"
while [[ "$#" -gt 0 ]]; do
case "$1" in
-f|--force) force=true ;;
-v|--verbose) verbose=true ;;
-h|--help)
echo "Usage: $0 [-f|--force] [-v|--verbose] [CACHE_DIR] [COMMIT]"
exit 0
;;
*)
# First non-option is CACHE_DIR, second is LIBP2P_COMMIT
if [[ -z "$CACHE_DIR" ]]; then
CACHE_DIR="$1"
elif [[ "$LIBP2P_COMMIT" == "124530a3" ]]; then
LIBP2P_COMMIT="$1"
else
echo "Unknown argument: $1"
exit 1
fi
;;
esac
shift
done
SUBREPO_DIR="vendor/go/src/github.com/libp2p/go-libp2p-daemon"
if [[ ! -e "$SUBREPO_DIR" ]]; then
SUBREPO_DIR="go-libp2p-daemon"
rm -rf "$SUBREPO_DIR"
git clone -q https://github.com/libp2p/go-libp2p-daemon
cd "$SUBREPO_DIR"
git checkout -q "$LIBP2P_COMMIT"
cd ..
fi
## env vars
[[ -z "$BUILD_MSG" ]] && BUILD_MSG="Building p2pd ${LIBP2P_COMMIT}"
# Windows detection
if uname | grep -qiE "mingw|msys"; then
EXE_SUFFIX=".exe"
# otherwise it fails in AppVeyor due to https://github.com/git-for-windows/git/issues/2495
GIT_TIMESTAMP_ARG="--date=unix" # available since Git 2.9.4
else
EXE_SUFFIX=""
GIT_TIMESTAMP_ARG="--date=format-local:%s" # available since Git 2.7.0
fi
TARGET_DIR="$(go env GOPATH)/bin"
TARGET_BINARY="${TARGET_DIR}/p2pd${EXE_SUFFIX}"
target_needs_rebuilding() {
REBUILD=0
NO_REBUILD=1
if [[ -n "$CACHE_DIR" && -e "${CACHE_DIR}/p2pd${EXE_SUFFIX}" ]]; then
mkdir -p "${TARGET_DIR}"
cp -a "$CACHE_DIR"/* "${TARGET_DIR}/"
fi
# compare the built commit's timestamp to the date of the last commit (keep in mind that Git doesn't preserve file timestamps)
if [[ -e "${TARGET_DIR}/timestamp" && $(cat "${TARGET_DIR}/timestamp") -eq $(cd "$SUBREPO_DIR"; git log --pretty=format:%cd -n 1 ${GIT_TIMESTAMP_ARG}) ]]; then
return $NO_REBUILD
else
return $REBUILD
fi
}
build_target() {
echo -e "$BUILD_MSG"
pushd "$SUBREPO_DIR"
# Go module downloads can fail randomly in CI VMs, so retry them a few times
MAX_RETRIES=5
CURR=0
while [[ $CURR -lt $MAX_RETRIES ]]; do
FAILED=0
go get ./... && break || FAILED=1
CURR=$(( CURR + 1 ))
if $verbose; then
echo "retry #${CURR}"
fi
done
if [[ $FAILED == 1 ]]; then
echo "Error: still fails after retrying ${MAX_RETRIES} times."
exit 1
fi
go install ./...
# record the last commit's timestamp
git log --pretty=format:%cd -n 1 ${GIT_TIMESTAMP_ARG} > "${TARGET_DIR}/timestamp"
popd
# update the CI cache
if [[ -n "$CACHE_DIR" ]]; then
rm -rf "$CACHE_DIR"
mkdir "$CACHE_DIR"
cp -a "$TARGET_DIR"/* "$CACHE_DIR"/
fi
echo "Binary built successfully: $TARGET_BINARY"
}
if $force || target_needs_rebuilding; then
build_target
else
echo "No rebuild needed."
fi

View File

@@ -1,665 +0,0 @@
import chronos, chronicles, stew/byteutils
import helpers
import ../libp2p
import ../libp2p/[daemon/daemonapi, varint, transports/wstransport, crypto/crypto]
import ../libp2p/protocols/connectivity/relay/[relay, client, utils]
type
SwitchCreator = proc(
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
prov = proc(config: TransportConfig): Transport =
TcpTransport.new({}, config.upgr),
relay: Relay = Relay.new(circuitRelayV1 = true),
): Switch {.gcsafe, raises: [LPError].}
DaemonPeerInfo = daemonapi.PeerInfo
proc writeLp(s: StreamTransport, msg: string | seq[byte]): Future[int] {.gcsafe.} =
## write lenght prefixed
var buf = initVBuffer()
buf.writeSeq(msg)
buf.finish()
result = s.write(buf.buffer)
proc readLp(s: StreamTransport): Future[seq[byte]] {.async.} =
## read length prefixed msg
var
size: uint
length: int
res: VarintResult[void]
result = newSeq[byte](10)
for i in 0 ..< len(result):
await s.readExactly(addr result[i], 1)
res = LP.getUVarint(result.toOpenArray(0, i), length, size)
if res.isOk():
break
res.expect("Valid varint")
result.setLen(size)
if size > 0.uint:
await s.readExactly(addr result[0], int(size))
proc testPubSubDaemonPublish(
gossip: bool = false, count: int = 1, swCreator: SwitchCreator
) {.async.} =
var pubsubData = "TEST MESSAGE"
var testTopic = "test-topic"
var msgData = pubsubData.toBytes()
var flags = {PSFloodSub}
if gossip:
flags = {PSGossipSub}
let daemonNode = await newDaemonApi(flags)
let daemonPeer = await daemonNode.identity()
let nativeNode = swCreator()
let pubsub =
if gossip:
GossipSub.init(switch = nativeNode).PubSub
else:
FloodSub.init(switch = nativeNode).PubSub
nativeNode.mount(pubsub)
await nativeNode.start()
await pubsub.start()
let nativePeer = nativeNode.peerInfo
var finished = false
var times = 0
proc nativeHandler(topic: string, data: seq[byte]) {.async.} =
let smsg = string.fromBytes(data)
check smsg == pubsubData
times.inc()
if times >= count and not finished:
finished = true
await nativeNode.connect(daemonPeer.peer, daemonPeer.addresses)
await sleepAsync(500.millis)
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
proc pubsubHandler(
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
): Future[bool] {.async: (raises: [CatchableError]).} =
result = true # don't cancel subscription
asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
pubsub.subscribe(testTopic, nativeHandler)
await sleepAsync(3.seconds)
proc publisher() {.async.} =
while not finished:
await daemonNode.pubsubPublish(testTopic, msgData)
await sleepAsync(250.millis)
await wait(publisher(), 5.minutes) # should be plenty of time
await nativeNode.stop()
await pubsub.stop()
await daemonNode.close()
proc testPubSubNodePublish(
gossip: bool = false, count: int = 1, swCreator: SwitchCreator
) {.async.} =
var pubsubData = "TEST MESSAGE"
var testTopic = "test-topic"
var msgData = pubsubData.toBytes()
var flags = {PSFloodSub}
if gossip:
flags = {PSGossipSub}
let daemonNode = await newDaemonApi(flags)
let daemonPeer = await daemonNode.identity()
let nativeNode = swCreator()
let pubsub =
if gossip:
GossipSub.init(switch = nativeNode).PubSub
else:
FloodSub.init(switch = nativeNode).PubSub
nativeNode.mount(pubsub)
await nativeNode.start()
await pubsub.start()
let nativePeer = nativeNode.peerInfo
await nativeNode.connect(daemonPeer.peer, daemonPeer.addresses)
await sleepAsync(500.millis)
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
var times = 0
var finished = false
proc pubsubHandler(
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
): Future[bool] {.async: (raises: [CatchableError]).} =
let smsg = string.fromBytes(message.data)
check smsg == pubsubData
times.inc()
if times >= count and not finished:
finished = true
result = true # don't cancel subscription
discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
proc nativeHandler(topic: string, data: seq[byte]) {.async.} =
discard
pubsub.subscribe(testTopic, nativeHandler)
await sleepAsync(3.seconds)
proc publisher() {.async.} =
while not finished:
discard await pubsub.publish(testTopic, msgData)
await sleepAsync(250.millis)
await wait(publisher(), 5.minutes) # should be plenty of time
check finished
await nativeNode.stop()
await pubsub.stop()
await daemonNode.close()
proc commonInteropTests*(name: string, swCreator: SwitchCreator) =
suite "Interop using " & name:
# TODO: chronos transports are leaking,
# but those are tracked for both the daemon
# and libp2p, so not sure which one it is,
# need to investigate more
# teardown:
# checkTrackers()
# TODO: this test is failing sometimes on windows
# For some reason we receive EOF before test 4 sometimes
asyncTest "native -> daemon multiple reads and writes":
var protos = @["/test-stream"]
let nativeNode = swCreator()
await nativeNode.start()
let daemonNode = await newDaemonApi()
let daemonPeer = await daemonNode.identity()
var testFuture = newFuture[void]("test.future")
proc daemonHandler(
api: DaemonAPI, stream: P2PStream
) {.async: (raises: [CatchableError]).} =
check string.fromBytes(await stream.transp.readLp()) == "test 1"
discard await stream.transp.writeLp("test 2")
check string.fromBytes(await stream.transp.readLp()) == "test 3"
discard await stream.transp.writeLp("test 4")
testFuture.complete()
await daemonNode.addHandler(protos, daemonHandler)
let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0])
await conn.writeLp("test 1")
check "test 2" == string.fromBytes((await conn.readLp(1024)))
await conn.writeLp("test 3")
check "test 4" == string.fromBytes((await conn.readLp(1024)))
await wait(testFuture, 10.secs)
await nativeNode.stop()
await daemonNode.close()
await sleepAsync(500.millis)
asyncTest "native -> daemon connection":
var protos = @["/test-stream"]
var test = "TEST STRING"
# We are preparing expect string, which should be prefixed with varint
# length and do not have `\r\n` suffix, because we going to use
# readLine().
var buffer = initVBuffer()
buffer.writeSeq(test & "\r\n")
buffer.finish()
var expect = newString(len(buffer) - 2)
copyMem(addr expect[0], addr buffer.buffer[0], len(expect))
let nativeNode = swCreator()
await nativeNode.start()
let daemonNode = await newDaemonApi()
let daemonPeer = await daemonNode.identity()
var testFuture = newFuture[string]("test.future")
proc daemonHandler(
api: DaemonAPI, stream: P2PStream
) {.async: (raises: [CatchableError]).} =
# We should perform `readLp()` instead of `readLine()`. `readLine()`
# here reads actually length prefixed string.
var line = await stream.transp.readLine()
check line == expect
testFuture.complete(line)
await stream.close()
await daemonNode.addHandler(protos, daemonHandler)
let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0])
await conn.writeLp(test & "\r\n")
check expect == (await wait(testFuture, 10.secs))
await conn.close()
await nativeNode.stop()
await daemonNode.close()
asyncTest "daemon -> native connection":
var protos = @["/test-stream"]
var test = "TEST STRING"
var testFuture = newFuture[string]("test.future")
proc nativeHandler(
conn: Connection, proto: string
) {.async: (raises: [CancelledError]).} =
try:
var line = string.fromBytes(await conn.readLp(1024))
check line == test
testFuture.complete(line)
except CancelledError as e:
raise e
except CatchableError:
check false # should not be here
finally:
await conn.close()
# custom proto
var proto = new LPProtocol
proto.handler = nativeHandler
proto.codec = protos[0] # codec
let nativeNode = swCreator()
nativeNode.mount(proto)
await nativeNode.start()
let nativePeer = nativeNode.peerInfo
let daemonNode = await newDaemonApi()
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
discard await stream.transp.writeLp(test)
check test == (await wait(testFuture, 10.secs))
await stream.close()
await nativeNode.stop()
await daemonNode.close()
await sleepAsync(500.millis)
asyncTest "native -> daemon websocket connection":
var protos = @["/test-stream"]
var test = "TEST STRING"
var testFuture = newFuture[string]("test.future")
proc nativeHandler(
conn: Connection, proto: string
) {.async: (raises: [CancelledError]).} =
try:
var line = string.fromBytes(await conn.readLp(1024))
check line == test
testFuture.complete(line)
except CancelledError as e:
raise e
except CatchableError:
check false # should not be here
finally:
await conn.close()
# custom proto
var proto = new LPProtocol
proto.handler = nativeHandler
proto.codec = protos[0] # codec
let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet()
let nativeNode = swCreator(
ma = wsAddress,
prov = proc(config: TransportConfig): Transport =
WsTransport.new(config.upgr),
)
nativeNode.mount(proto)
await nativeNode.start()
let nativePeer = nativeNode.peerInfo
let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress])
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
discard await stream.transp.writeLp(test)
check test == (await wait(testFuture, 10.secs))
await stream.close()
await nativeNode.stop()
await daemonNode.close()
await sleepAsync(500.millis)
asyncTest "daemon -> native websocket connection":
var protos = @["/test-stream"]
var test = "TEST STRING"
# We are preparing expect string, which should be prefixed with varint
# length and do not have `\r\n` suffix, because we going to use
# readLine().
var buffer = initVBuffer()
buffer.writeSeq(test & "\r\n")
buffer.finish()
var expect = newString(len(buffer) - 2)
copyMem(addr expect[0], addr buffer.buffer[0], len(expect))
let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet()
let nativeNode = SwitchBuilder
.new()
.withAddress(wsAddress)
.withRng(crypto.newRng())
.withMplex()
.withWsTransport()
.withNoise()
.build()
await nativeNode.start()
let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress])
let daemonPeer = await daemonNode.identity()
var testFuture = newFuture[string]("test.future")
proc daemonHandler(
api: DaemonAPI, stream: P2PStream
) {.async: (raises: [CatchableError]).} =
# We should perform `readLp()` instead of `readLine()`. `readLine()`
# here reads actually length prefixed string.
var line = await stream.transp.readLine()
check line == expect
testFuture.complete(line)
await stream.close()
await daemonNode.addHandler(protos, daemonHandler)
let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0])
await conn.writeLp(test & "\r\n")
check expect == (await wait(testFuture, 10.secs))
await conn.close()
await nativeNode.stop()
await daemonNode.close()
asyncTest "daemon -> multiple reads and writes":
var protos = @["/test-stream"]
var testFuture = newFuture[void]("test.future")
proc nativeHandler(
conn: Connection, proto: string
) {.async: (raises: [CancelledError]).} =
try:
check "test 1" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("test 2".toBytes())
check "test 3" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("test 4".toBytes())
testFuture.complete()
except CancelledError as e:
raise e
except CatchableError:
check false # should not be here
finally:
await conn.close()
# custom proto
var proto = new LPProtocol
proto.handler = nativeHandler
proto.codec = protos[0] # codec
let nativeNode = swCreator()
nativeNode.mount(proto)
await nativeNode.start()
let nativePeer = nativeNode.peerInfo
let daemonNode = await newDaemonApi()
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
asyncDiscard stream.transp.writeLp("test 1")
check "test 2" == string.fromBytes(await stream.transp.readLp())
asyncDiscard stream.transp.writeLp("test 3")
check "test 4" == string.fromBytes(await stream.transp.readLp())
await wait(testFuture, 10.secs)
await stream.close()
await nativeNode.stop()
await daemonNode.close()
asyncTest "read write multiple":
var protos = @["/test-stream"]
var test = "TEST STRING"
var count = 0
var testFuture = newFuture[int]("test.future")
proc nativeHandler(
conn: Connection, proto: string
) {.async: (raises: [CancelledError]).} =
try:
while count < 10:
var line = string.fromBytes(await conn.readLp(1024))
check line == test
await conn.writeLp(test.toBytes())
count.inc()
testFuture.complete(count)
except CancelledError as e:
raise e
except CatchableError:
check false # should not be here
finally:
await conn.close()
# custom proto
var proto = new LPProtocol
proto.handler = nativeHandler
proto.codec = protos[0] # codec
let nativeNode = swCreator()
nativeNode.mount(proto)
await nativeNode.start()
let nativePeer = nativeNode.peerInfo
let daemonNode = await newDaemonApi()
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
var count2 = 0
while count2 < 10:
discard await stream.transp.writeLp(test)
let line = await stream.transp.readLp()
check test == string.fromBytes(line)
inc(count2)
check 10 == (await wait(testFuture, 1.minutes))
await stream.close()
await nativeNode.stop()
await daemonNode.close()
asyncTest "floodsub: daemon publish one":
await testPubSubDaemonPublish(swCreator = swCreator)
asyncTest "floodsub: daemon publish many":
await testPubSubDaemonPublish(count = 10, swCreator = swCreator)
asyncTest "gossipsub: daemon publish one":
await testPubSubDaemonPublish(gossip = true, swCreator = swCreator)
asyncTest "gossipsub: daemon publish many":
await testPubSubDaemonPublish(gossip = true, count = 10, swCreator = swCreator)
asyncTest "floodsub: node publish one":
await testPubSubNodePublish(swCreator = swCreator)
asyncTest "floodsub: node publish many":
await testPubSubNodePublish(count = 10, swCreator = swCreator)
asyncTest "gossipsub: node publish one":
await testPubSubNodePublish(gossip = true, swCreator = swCreator)
asyncTest "gossipsub: node publish many":
await testPubSubNodePublish(gossip = true, count = 10, swCreator = swCreator)
proc relayInteropTests*(name: string, relayCreator: SwitchCreator) =
suite "Interop relay using " & name:
asyncTest "NativeSrc -> NativeRelay -> DaemonDst":
let closeBlocker = newFuture[void]()
let daemonFinished = newFuture[void]()
# TODO: This Future blocks the daemonHandler after sending the last message.
# It exists because there's a strange behavior where stream.close sends
# a Rst instead of Fin. We should investigate this at some point.
proc daemonHandler(
api: DaemonAPI, stream: P2PStream
) {.async: (raises: [CatchableError]).} =
check "line1" == string.fromBytes(await stream.transp.readLp())
discard await stream.transp.writeLp("line2")
check "line3" == string.fromBytes(await stream.transp.readLp())
discard await stream.transp.writeLp("line4")
await closeBlocker
await stream.close()
daemonFinished.complete()
let
maSrc = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
maRel = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
src = relayCreator(maSrc, relay = RelayClient.new(circuitRelayV1 = true))
rel = relayCreator(maRel)
await src.start()
await rel.start()
let daemonNode = await newDaemonApi()
let daemonPeer = await daemonNode.identity()
let maStr =
$rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit"
let maddr = MultiAddress.init(maStr).tryGet()
await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
await rel.connect(daemonPeer.peer, daemonPeer.addresses)
await daemonNode.addHandler(@["/testCustom"], daemonHandler)
let conn = await src.dial(daemonPeer.peer, @[maddr], @["/testCustom"])
await conn.writeLp("line1")
check string.fromBytes(await conn.readLp(1024)) == "line2"
await conn.writeLp("line3")
check string.fromBytes(await conn.readLp(1024)) == "line4"
closeBlocker.complete()
await daemonFinished
await conn.close()
await allFutures(src.stop(), rel.stop())
try:
await daemonNode.close()
except CatchableError as e:
when defined(windows):
# On Windows, daemon close may fail due to socket race condition
# This is expected behavior and can be safely ignored
discard
else:
raise e
asyncTest "DaemonSrc -> NativeRelay -> NativeDst":
proc customHandler(
conn: Connection, proto: string
) {.async: (raises: [CancelledError]).} =
try:
check "line1" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("line2")
check "line3" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("line4")
except CancelledError as e:
raise e
except CatchableError:
check false # should not be here
finally:
await conn.close()
let protos = @["/customProto", RelayV1Codec]
var customProto = new LPProtocol
customProto.handler = customHandler
customProto.codec = protos[0]
let
maRel = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
maDst = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
rel = relayCreator(maRel)
dst = relayCreator(maDst, relay = RelayClient.new())
dst.mount(customProto)
await rel.start()
await dst.start()
let daemonNode = await newDaemonApi()
let daemonPeer = await daemonNode.identity()
let maStr =
$rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit"
let maddr = MultiAddress.init(maStr).tryGet()
await daemonNode.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
await rel.connect(dst.peerInfo.peerId, dst.peerInfo.addrs)
await daemonNode.connect(dst.peerInfo.peerId, @[maddr])
var stream = await daemonNode.openStream(dst.peerInfo.peerId, protos)
discard await stream.transp.writeLp("line1")
check string.fromBytes(await stream.transp.readLp()) == "line2"
discard await stream.transp.writeLp("line3")
check string.fromBytes(await stream.transp.readLp()) == "line4"
await allFutures(dst.stop(), rel.stop())
await daemonNode.close()
asyncTest "NativeSrc -> DaemonRelay -> NativeDst":
proc customHandler(
conn: Connection, proto: string
) {.async: (raises: [CancelledError]).} =
try:
check "line1" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("line2")
check "line3" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("line4")
except CancelledError as e:
raise e
except CatchableError:
check false # should not be here
finally:
await conn.close()
let protos = @["/customProto", RelayV1Codec]
var customProto = new LPProtocol
customProto.handler = customHandler
customProto.codec = protos[0]
let
maSrc = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
maDst = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
src = relayCreator(maSrc, relay = RelayClient.new())
dst = relayCreator(maDst, relay = RelayClient.new())
dst.mount(customProto)
await src.start()
await dst.start()
let daemonNode = await newDaemonApi({RelayHop})
let daemonPeer = await daemonNode.identity()
let maStr = $daemonPeer.addresses[0] & "/p2p/" & $daemonPeer.peer & "/p2p-circuit"
let maddr = MultiAddress.init(maStr).tryGet()
await src.connect(daemonPeer.peer, daemonPeer.addresses)
await daemonNode.connect(dst.peerInfo.peerId, dst.peerInfo.addrs)
let conn = await src.dial(dst.peerInfo.peerId, @[maddr], protos[0])
await conn.writeLp("line1")
check string.fromBytes(await conn.readLp(1024)) == "line2"
await conn.writeLp("line3")
check string.fromBytes(await conn.readLp(1024)) == "line4"
await allFutures(src.stop(), dst.stop())
await daemonNode.close()

View File

@@ -1,4 +1,5 @@
{.used.}
import
testdiscoverymngr, testrendezvous, testrendezvousprotobuf, testrendezvousinterface
testdiscoverymngr, testrendezvous, testrendezvouserrors, testrendezvousprotobuf,
testrendezvousinterface

View File

@@ -9,9 +9,17 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
import sequtils, strutils
import sequtils
import chronos
import ../../libp2p/[protocols/rendezvous, switch]
import
../../libp2p/[
protocols/rendezvous,
protocols/rendezvous/protobuf,
peerinfo,
switch,
routing_record,
crypto/crypto,
]
import ../../libp2p/discovery/discoverymngr
import ../../libp2p/utils/offsettedseq
import ../helpers
@@ -73,6 +81,20 @@ suite "RendezVous":
peerRecords.len == 1
peerRecords[0] == peerNodes[0].switch.peerInfo.signedPeerRecord.data
asyncTest "Peer is not registered when peer record validation fails":
let (rendezvousNode, peerNodes) = setupRendezvousNodeWithPeerNodes(1)
(rendezvousNode & peerNodes).startAndDeferStop()
await connectNodes(peerNodes[0], rendezvousNode)
peerNodes[0].switch.peerInfo.signedPeerRecord =
createCorruptedSignedPeerRecord(peerNodes[0].switch.peerInfo.peerId)
const namespace = "foo"
await peerNodes[0].advertise(namespace)
check rendezvousNode.registered.s.len == 0
asyncTest "Unsubscribe removes registered peer from remote":
let (rendezvousNode, peerNodes) = setupRendezvousNodeWithPeerNodes(1)
(rendezvousNode & peerNodes).startAndDeferStop()
@@ -87,6 +109,17 @@ suite "RendezVous":
await peerNodes[0].unsubscribe(namespace)
check (await peerNodes[0].request(Opt.some(namespace))).len == 0
asyncTest "Unsubscribe for not registered namespace is ignored":
let (rendezvousNode, peerNodes) = setupRendezvousNodeWithPeerNodes(1)
(rendezvousNode & peerNodes).startAndDeferStop()
await connectNodes(peerNodes[0], rendezvousNode)
await peerNodes[0].advertise("foo")
await peerNodes[0].unsubscribe("bar")
check rendezvousNode.registered.s.len == 1
asyncTest "Consecutive requests with namespace returns peers with pagination":
let (rendezvousNode, peerNodes) = setupRendezvousNodeWithPeerNodes(11)
(rendezvousNode & peerNodes).startAndDeferStop()
@@ -447,26 +480,3 @@ suite "RendezVous":
# 1001st registration ignored, limit reached
await peerRdv.advertise(namespace)
check rendezvousNode.registered.s.len == RegistrationLimitPerPeer
asyncTest "Various local error":
let rdv = RendezVous.new(minDuration = 1.minutes, maxDuration = 72.hours)
expect AdvertiseError:
discard await rdv.request(Opt.some("A".repeat(300)))
expect AdvertiseError:
discard await rdv.request(Opt.some("A"), -1)
expect AdvertiseError:
discard await rdv.request(Opt.some("A"), 3000)
expect AdvertiseError:
await rdv.advertise("A".repeat(300))
expect AdvertiseError:
await rdv.advertise("A", 73.hours)
expect AdvertiseError:
await rdv.advertise("A", 30.seconds)
test "Various config error":
expect RendezVousError:
discard RendezVous.new(minDuration = 30.seconds)
expect RendezVousError:
discard RendezVous.new(maxDuration = 73.hours)
expect RendezVousError:
discard RendezVous.new(minDuration = 15.minutes, maxDuration = 10.minutes)

View File

@@ -0,0 +1,153 @@
{.used.}
# Nim-Libp2p
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import sequtils
import strformat
import strutils
import chronos
import
../../libp2p/[
protocols/rendezvous,
protocols/rendezvous/protobuf,
peerinfo,
switch,
routing_record,
crypto/crypto,
]
import ../../libp2p/discovery/discoverymngr
import ../helpers
import ./utils
suite "RendezVous Errors":
teardown:
checkTrackers()
asyncTest "Various local error":
let rdv = RendezVous.new(minDuration = 1.minutes, maxDuration = 72.hours)
expect AdvertiseError:
discard await rdv.request(Opt.some("A".repeat(300)))
expect AdvertiseError:
discard await rdv.request(Opt.some("A"), -1)
expect AdvertiseError:
discard await rdv.request(Opt.some("A"), 3000)
expect AdvertiseError:
await rdv.advertise("A".repeat(300))
expect AdvertiseError:
await rdv.advertise("A", 73.hours)
expect AdvertiseError:
await rdv.advertise("A", 30.seconds)
test "Various config error":
expect RendezVousError:
discard RendezVous.new(minDuration = 30.seconds)
expect RendezVousError:
discard RendezVous.new(maxDuration = 73.hours)
expect RendezVousError:
discard RendezVous.new(minDuration = 15.minutes, maxDuration = 10.minutes)
let testCases =
@[
(
"Register - Invalid Namespace",
(
proc(node: RendezVous): Message =
prepareRegisterMessage(
"A".repeat(300),
node.switch.peerInfo.signedPeerRecord.encode().get,
2.hours,
)
),
ResponseStatus.InvalidNamespace,
),
(
"Register - Invalid Signed Peer Record",
(
proc(node: RendezVous): Message =
# Malformed SPR - empty bytes will fail validation
prepareRegisterMessage("namespace", newSeq[byte](), 2.hours)
),
ResponseStatus.InvalidSignedPeerRecord,
),
(
"Register - Invalid TTL",
(
proc(node: RendezVous): Message =
prepareRegisterMessage(
"namespace", node.switch.peerInfo.signedPeerRecord.encode().get, 73.hours
)
),
ResponseStatus.InvalidTTL,
),
(
"Discover - Invalid Namespace",
(
proc(node: RendezVous): Message =
prepareDiscoverMessage(ns = Opt.some("A".repeat(300)))
),
ResponseStatus.InvalidNamespace,
),
(
"Discover - Invalid Cookie",
(
proc(node: RendezVous): Message =
# Empty buffer will fail Cookie.decode().tryGet() and yield InvalidCookie
prepareDiscoverMessage(cookie = Opt.some(newSeq[byte]()))
),
ResponseStatus.InvalidCookie,
),
]
for test in testCases:
let (testName, getMessage, expectedStatus) = test
asyncTest &"Node returns ERROR_CODE for invalid message - {testName}":
let (rendezvousNode, peerNodes) = setupRendezvousNodeWithPeerNodes(1)
(rendezvousNode & peerNodes).startAndDeferStop()
await connectNodes(peerNodes[0], rendezvousNode)
let
peerNode = peerNodes[0]
messageBuf = encode(getMessage(peerNode)).buffer
let
responseBuf = await sendRdvMessage(peerNode, rendezvousNode, messageBuf)
responseMessage = Message.decode(responseBuf).tryGet()
actualStatus =
if responseMessage.registerResponse.isSome():
responseMessage.registerResponse.get.status
else:
responseMessage.discoverResponse.get.status
check actualStatus == expectedStatus
asyncTest "Node returns NotAuthorized when Register exceeding peer limit":
let (rendezvousNode, peerNodes) = setupRendezvousNodeWithPeerNodes(1)
(rendezvousNode & peerNodes).startAndDeferStop()
await connectNodes(peerNodes[0], rendezvousNode)
# Pre-populate registrations up to the limit for this peer under the same namespace
let namespace = "namespaceNA"
await populatePeerRegistrations(
peerNodes[0], rendezvousNode, namespace, RegistrationLimitPerPeer
)
# Attempt one more registration which should be rejected with NotAuthorized
let messageBuf = encode(
prepareRegisterMessage(
namespace, peerNodes[0].switch.peerInfo.signedPeerRecord.encode().get, 2.hours
)
).buffer
let responseBuf = await sendRdvMessage(peerNodes[0], rendezvousNode, messageBuf)
let responseMessage = Message.decode(responseBuf).tryGet()
check responseMessage.registerResponse.get.status == ResponseStatus.NotAuthorized

View File

@@ -1,6 +1,16 @@
import sequtils
import chronos
import ../../libp2p/[protobuf/minprotobuf, protocols/rendezvous, switch, builders]
import sequtils
import
../../libp2p/[
builders,
crypto/crypto,
peerid,
protobuf/minprotobuf,
protocols/rendezvous,
protocols/rendezvous/protobuf,
routing_record,
switch,
]
proc createSwitch*(rdv: RendezVous = RendezVous.new()): Switch =
SwitchBuilder
@@ -76,3 +86,41 @@ proc populatePeerRegistrations*(
let record = targetRdv.registered.s[0]
for i in 0 ..< count - 1:
targetRdv.registered.s.add(record)
proc createCorruptedSignedPeerRecord*(peerId: PeerId): SignedPeerRecord =
let rng = newRng()
let wrongPrivKey = PrivateKey.random(rng[]).tryGet()
let record = PeerRecord.init(peerId, @[])
SignedPeerRecord.init(wrongPrivKey, record).tryGet()
proc sendRdvMessage*(
node: RendezVous, target: RendezVous, buffer: seq[byte]
): Future[seq[byte]] {.async.} =
let conn = await node.switch.dial(target.switch.peerInfo.peerId, RendezVousCodec)
defer:
await conn.close()
await conn.writeLp(buffer)
let response = await conn.readLp(4096)
response
proc prepareRegisterMessage*(
namespace: string, spr: seq[byte], ttl: Duration
): Message =
Message(
msgType: MessageType.Register,
register: Opt.some(
Register(ns: namespace, signedPeerRecord: spr, ttl: Opt.some(ttl.seconds.uint64))
),
)
proc prepareDiscoverMessage*(
ns: Opt[string] = Opt.none(string),
limit: Opt[uint64] = Opt.none(uint64),
cookie: Opt[seq[byte]] = Opt.none(seq[byte]),
): Message =
Message(
msgType: MessageType.Discover,
discover: Opt.some(Discover(ns: ns, limit: limit, cookie: cookie)),
)

View File

@@ -0,0 +1,101 @@
{.used.}
import results, unittest
import ../../libp2p/peerid
import ../../libp2p/protocols/mix/[serialization, fragmentation]
suite "Fragmentation":
let peerId =
PeerId.init("16Uiu2HAmFkwLVsVh6gGPmSm9R3X4scJ5thVdKfWYeJsKeVrbcgVC").get()
test "serialize and deserialize message chunk":
let
message = newSeq[byte](DataSize)
chunks = padAndChunkMessage(message, peerId)
serialized = chunks[0].serialize()
deserialized =
MessageChunk.deserialize(serialized).expect("Deserialization error")
check chunks[0] == deserialized
test "pad and unpad small message":
let
message = cast[seq[byte]]("Hello, World!")
messageBytesLen = len(message)
paddedMsg = addPadding(message, peerId)
unpaddedMessage = removePadding(paddedMsg).expect("Unpad error")
let (paddingLength, data, _) = paddedMsg.get()
check:
paddingLength == uint16(DataSize - messageBytesLen)
data.len == DataSize
unpaddedMessage.len == messageBytesLen
test "pad and chunk large message":
let
message = newSeq[byte](MessageSize * 2 + (MessageSize - 1))
messageBytesLen = len(message)
chunks = padAndChunkMessage(message, peerId)
totalChunks = max(1, ceilDiv(messageBytesLen, DataSize))
check chunks.len == totalChunks
for i in 0 ..< totalChunks:
let (paddingLength, data, _) = chunks[i].get()
if i != totalChunks - 1:
check paddingLength == 0
else:
let chunkSize = messageBytesLen mod DataSize
check paddingLength == uint16(DataSize - chunkSize)
check data.len == DataSize
test "chunk sequence numbers are consecutive":
let
message = newSeq[byte](MessageSize * 3)
messageBytesLen = len(message)
chunks = padAndChunkMessage(message, peerId)
totalChunks = max(1, ceilDiv(messageBytesLen, DataSize))
check chunks.len == totalChunks
let (_, _, firstSeqNo) = chunks[0].get()
for i in 1 ..< totalChunks:
let (_, _, seqNo) = chunks[i].get()
check seqNo == firstSeqNo + uint32(i)
test "chunk data reconstructs original message":
let
message = cast[seq[byte]]("This is a test message that will be split into multiple chunks.")
chunks = padAndChunkMessage(message, peerId)
var reconstructed: seq[byte]
for chunk in chunks:
let (paddingLength, data, _) = chunk.get()
reconstructed.add(data[paddingLength.int ..^ 1])
check reconstructed == message
test "empty message handling":
let
message = cast[seq[byte]]("")
chunks = padAndChunkMessage(message, peerId)
check chunks.len == 1
let (paddingLength, _, _) = chunks[0].get()
check paddingLength == uint16(DataSize)
test "message size equal to chunk size":
let
message = newSeq[byte](DataSize)
chunks = padAndChunkMessage(message, peerId)
check chunks.len == 1
let (paddingLength, _, _) = chunks[0].get()
check paddingLength == 0

84
tests/mix/testmixnode.nim Normal file
View File

@@ -0,0 +1,84 @@
{.used.}
import strformat, results, unittest2
import ../../libp2p/[crypto/crypto, crypto/secp, multiaddress, multicodec, peerid]
import ../../libp2p/protocols/mix/[curve25519, mix_node]
suite "Mix Node Tests":
var mixNodes {.threadvar.}: MixNodes
setup:
const count = 5
let mixNodes = initializeMixNodes(count).expect("could not generate mix nodes")
check:
mixNodes.len == count
teardown:
deleteNodeInfoFolder()
deletePubInfoFolder()
test "get mix node by index":
for i in 0 ..< count:
let node = mixNodes[i]
let
(peerId, multiAddr, mixPubKey, mixPrivKey, libp2pPubKey, libp2pPrivKey) =
node.get()
pubKeyProto = PublicKey(scheme: Secp256k1, skkey: libp2pPubKey)
expectedPeerId = PeerId.init(pubKeyProto).get()
expectedMA = MultiAddress.init(fmt"/ip4/0.0.0.0/tcp/{4242 + i}").tryGet()
check:
peerId == expectedPeerId
multiAddr == expectedMA
fieldElementToBytes(mixPubKey).len == FieldElementSize
fieldElementToBytes(mixPrivKey).len == FieldElementSize
libp2pPubKey.getBytes().len == SkRawPublicKeySize
libp2pPrivKey.getBytes().len == SkRawPrivateKeySize
test "find mixnode by peerid":
for i in 0 ..< count:
let
node = mixNodes[i]
foundNode = mixNodes.findByPeerId(node.peerId).expect("find mix node error")
check:
foundNode == node
test "invalid_peer_id_lookup":
let peerId = PeerId.random().expect("could not generate peerId")
check:
mixNodes.findByPeerId(peerId).isErr()
test "write and read mixnodeinfo":
for i in 0 ..< count:
let node = mixNodes[i]
node.writeToFile(i).expect("File write error")
let readNode = MixNodeInfo.readFromFile(i).expect("Read node error")
check:
readNode == node
test "write and read mixpubinfo":
for i in 0 ..< count:
let mixPubInfo =
mixNodes.getMixPubInfoByIndex(i).expect("couldnt obtain mixpubinfo")
mixPubInfo.writeToFile(i).expect("File write error")
let readNode = MixPubInfo.readFromFile(i).expect("File read error")
check:
mixPubInfo == readNode
test "read nonexistent mixnodeinfo":
check:
MixNodeInfo.readFromFile(999).isErr()
test "generate mix nodes with different ports":
const count = 3
let basePort = 5000
let mixNodes =
initializeMixNodes(count, basePort).expect("could not generate mix nodes")
for i in 0 ..< count:
let tcpPort = mixNodes[i].multiAddr.getPart(multiCodec("tcp")).value()
let expectedPort = MultiAddress.init(fmt"/tcp/{basePort + i}").tryGet()
check:
tcpPort == expectedPort

View File

@@ -2,58 +2,56 @@
import results, unittest
import ../../libp2p/protocols/mix/[serialization, multiaddr]
import ../../libp2p/multiaddress
import ../../libp2p/[peerid, multiaddress]
template maddr(ma: string): MultiAddress =
MultiAddress.init(ma).tryGet()
template maddrConversionShouldFail(ma: string, msg: string) =
proc maddrConversionShouldFail(ma: string, msg: string) =
test msg:
echo MultiAddress
.init(ma)
.expect("could not initialize multiaddr")
.multiAddrToBytes()
.error()
let peerId = PeerId.random().expect("could not generate peerId")
let ma = MultiAddress.init(ma).expect("could not initialize multiaddr")
check:
MultiAddress
.init(ma)
.expect("could not initialize multiaddr")
.multiAddrToBytes().isErr
multiAddrToBytes(peerId, ma).isErr
suite "Utils tests":
test "multi_addr_conversion":
test "multiaddress conversion":
let multiAddrs = [
"/ip4/0.0.0.0/tcp/4242/p2p/16Uiu2HAmFkwLVsVh6gGPmSm9R3X4scJ5thVdKfWYeJsKeVrbcgVC",
"/ip4/10.0.0.1/tcp/1234/p2p/16Uiu2HAmDHw4mwBdEjxjJPhrt8Eq1kvDjXAuwkqCmhNiz363AFV2",
"/ip4/192.168.1.1/udp/8080/quic-v1/p2p/16Uiu2HAm6WNzw8AssyPscYYi8x1bY5wXyQrGTShRH75bh5dPCjBQ",
"/ip4/10.0.0.1/tcp/1234/p2p/16Uiu2HAmDHw4mwBdEjxjJPhrt8Eq1kvDjXAuwkqCmhNiz363AFV2/p2p-circuit/p2p/16Uiu2HAm6WNzw8AssyPscYYi8x1bY5wXyQrGTShRH75bh5dPCjBQ",
"/ip4/10.0.0.1/udp/1234/quic-v1/p2p/16Uiu2HAmDHw4mwBdEjxjJPhrt8Eq1kvDjXAuwkqCmhNiz363AFV2/p2p-circuit/p2p/16Uiu2HAm6WNzw8AssyPscYYi8x1bY5wXyQrGTShRH75bh5dPCjBQ",
"/ip4/0.0.0.0/tcp/4242", "/ip4/10.0.0.1/tcp/1234",
"/ip4/192.168.1.1/udp/8080/quic-v1",
"/ip4/10.0.0.1/tcp/1234/p2p/16Uiu2HAmDHw4mwBdEjxjJPhrt8Eq1kvDjXAuwkqCmhNiz363AFV2/p2p-circuit",
"/ip4/10.0.0.1/udp/1234/quic-v1/p2p/16Uiu2HAmDHw4mwBdEjxjJPhrt8Eq1kvDjXAuwkqCmhNiz363AFV2/p2p-circuit",
]
for multiAddr in multiAddrs:
let ma = maddr(multiAddr)
let multiAddrBytes = ma.multiAddrToBytes().expect("conversion failed")
let
ma = maddr(multiAddr)
peerId = PeerId.random().expect("could not generate peerId")
multiAddrBytes = multiAddrToBytes(peerId, ma).expect("conversion failed")
check multiAddrBytes.len == AddrSize
let deserializedMa = bytesToMultiAddr(multiAddrBytes).expect("conversion failed")
check deserializedMa == ma
let (dPeerId, deserializedMa) =
bytesToMultiAddr(multiAddrBytes).expect("conversion failed")
check:
deserializedMa == ma
dPeerId == peerId
maddrConversionShouldFail("/ip4/0.0.0.0/tcp/4242/quic-v1/", "invalid protocol")
maddrConversionShouldFail("/ip4/0.0.0.0", "invalid multiaddress format")
maddrConversionShouldFail(
"/ip4/0.0.0.0/tcp/4242/quic-v1/p2p/16Uiu2HAmFkwLVsVh6gGPmSm9R3X4scJ5thVdKfWYeJsKeVrbcgVC",
"invalid_protocol",
"/ip4/0.0.0.0/tcp/4242/p2p-circuit", "invalid multiaddress format circuit relay"
)
maddrConversionShouldFail(
"/ip4/0.0.0.0/tcp/4242/p2p/QmcycySVeRSftFQGM392xCqDh6UUbhSU9ykNpxrFBPX3gJ",
"invalid_peerid_length",
"/ip4/0.0.0.0/tcp/4242/p2p/QmcycySVeRSftFQGM392xCqDh6UUbhSU9ykNpxrFBPX3gJ/p2p-circuit",
"invalid peerId in circuit relay addr",
)
maddrConversionShouldFail("/ip4/0.0.0.0/tcp/4242", "invalid_multiaddress_format")
maddrConversionShouldFail(
"/ip4/0.0.0.0/tcp/4242/p2p-circuit", "invalid_multiaddress_format_circuit_relay"
)
test "invalid_addr_length":
test "invalid address length":
let invalidBytes = newSeq[byte](AddrSize - 1)
check:
bytesToMultiAddr(invalidBytes).isErr

View File

@@ -56,7 +56,7 @@ suite "serialization_tests":
header = Header.init(
newSeq[byte](AlphaSize), newSeq[byte](BetaSize), newSeq[byte](GammaSize)
)
payload = newSeq[byte](payloadSize)
payload = newSeq[byte](PayloadSize)
packet = SphinxPacket.init(header, payload)
let serialized = packet.serialize()

View File

@@ -1,12 +1,12 @@
{.used.}
import random, results, unittest
import random, results, unittest, chronicles
import ../../libp2p/crypto/crypto
import ../../libp2p/protocols/mix/[curve25519, serialization, sphinx, tag_manager]
import bearssl/rand
# Helper function to pad/truncate message
proc padMessage(message: openArray[byte], size: int): seq[byte] =
proc addPadding(message: openArray[byte], size: int): seq[byte] =
if message.len >= size:
return message[0 .. size - 1] # Truncate if larger
else:
@@ -39,8 +39,8 @@ proc createDummyData(): (
dest = Hop.init(newSeq[byte](AddrSize))
return (message, privateKeys, publicKeys, delay, hops, dest)
proc randomI(): I =
newRng()[].generate(I)
template randomI(): SURBIdentifier =
newRng()[].generate(SURBIdentifier)
# Unit tests for sphinx.nim
suite "Sphinx Tests":
@@ -52,7 +52,7 @@ suite "Sphinx Tests":
teardown:
clearTags(tm)
test "sphinx_wrap_and_process":
test "sphinx wrap and process":
let (message, privateKeys, publicKeys, delay, hops, dest) = createDummyData()
let packetBytes = wrapInSphinxPacket(message, publicKeys, delay, hops, dest).expect(
@@ -94,7 +94,7 @@ suite "Sphinx Tests":
processedSP3.status == Exit
processedSP3.messageChunk == message
test "sphinx_wrap_empty_public_keys":
test "sphinx wrap empty public keys":
let (message, _, _, delay, _, dest) = createDummyData()
check wrapInSphinxPacket(message, @[], delay, @[], dest).isErr
@@ -118,7 +118,7 @@ suite "Sphinx Tests":
check invalidMacPkt.status == InvalidMAC
test "sphinx_process_duplicate_tag":
test "sphinx process duplicate tag":
let (message, privateKeys, publicKeys, delay, hops, dest) = createDummyData()
let packetBytes = wrapInSphinxPacket(message, publicKeys, delay, hops, dest).expect(
@@ -140,7 +140,7 @@ suite "Sphinx Tests":
check processedSP2.status == Duplicate
test "sphinx_wrap_and_process_message_sizes":
test "sphinx wrap and process message sizes":
let MessageSizes = @[32, 64, 128, 256, 512]
for size in MessageSizes:
let (_, privateKeys, publicKeys, delay, hops, dest) = createDummyData()
@@ -148,7 +148,7 @@ suite "Sphinx Tests":
randomize()
for i in 0 ..< size:
message[i] = byte(rand(256))
let paddedMessage = padMessage(message, MessageSize)
let paddedMessage = addPadding(message, MessageSize)
let packetBytes = wrapInSphinxPacket(paddedMessage, publicKeys, delay, hops, dest)
.expect("Sphinx wrap error")
@@ -186,3 +186,150 @@ suite "Sphinx Tests":
check:
processedSP3.status == Exit
processedSP3.messageChunk == paddedMessage
test "create and use surb":
let (message, privateKeys, publicKeys, delay, hops, _) = createDummyData()
let surb =
createSURB(publicKeys, delay, hops, randomI()).expect("Create SURB error")
let packetBytes = useSURB(surb, message).serialize()
check packetBytes.len == PacketSize
let packet = SphinxPacket.deserialize(packetBytes).expect("Sphinx wrap error")
let processedSP1 =
processSphinxPacket(packet, privateKeys[0], tm).expect("Sphinx processing error")
check:
processedSP1.status == Intermediate
processedSP1.serializedSphinxPacket.len == PacketSize
let processedPacket1 = SphinxPacket
.deserialize(processedSP1.serializedSphinxPacket)
.expect("Sphinx wrap error")
let processedSP2 = processSphinxPacket(processedPacket1, privateKeys[1], tm).expect(
"Sphinx processing error"
)
check:
processedSP2.status == Intermediate
processedSP2.serializedSphinxPacket.len == PacketSize
let processedPacket2 = SphinxPacket
.deserialize(processedSP2.serializedSphinxPacket)
.expect("Sphinx wrap error")
let processedSP3 = processSphinxPacket(processedPacket2, privateKeys[2], tm).expect(
"Sphinx processing error"
)
check processedSP3.status == Reply
let msg = processReply(surb.key, surb.secret.get(), processedSP3.delta_prime).expect(
"Reply processing failed"
)
check msg == message
test "create surb empty public keys":
let (message, _, _, delay, _, _) = createDummyData()
check createSURB(@[], delay, @[], randomI()).isErr()
test "surb sphinx process invalid mac":
let (message, privateKeys, publicKeys, delay, hops, _) = createDummyData()
let surb =
createSURB(publicKeys, delay, hops, randomI()).expect("Create SURB error")
let packetBytes = useSURB(surb, message).serialize()
check packetBytes.len == PacketSize
# Corrupt the MAC for testing
var tamperedPacketBytes = packetBytes
tamperedPacketBytes[0] = packetBytes[0] xor 0x01
let tamperedPacket =
SphinxPacket.deserialize(tamperedPacketBytes).expect("Sphinx wrap error")
let processedSP1 = processSphinxPacket(tamperedPacket, privateKeys[0], tm).expect(
"Sphinx processing error"
)
check processedSP1.status == InvalidMAC
test "surb sphinx process duplicate tag":
let (message, privateKeys, publicKeys, delay, hops, _) = createDummyData()
let surb =
createSURB(publicKeys, delay, hops, randomI()).expect("Create SURB error")
let packetBytes = useSURB(surb, message).serialize()
check packetBytes.len == PacketSize
let packet = SphinxPacket.deserialize(packetBytes).expect("Sphinx wrap error")
# Process the packet twice to test duplicate tag handling
let processedSP1 =
processSphinxPacket(packet, privateKeys[0], tm).expect("Sphinx processing error")
check processedSP1.status == Intermediate
let processedSP2 =
processSphinxPacket(packet, privateKeys[0], tm).expect("Sphinx processing error")
check processedSP2.status == Duplicate
test "create and use surb message sizes":
let messageSizes = @[32, 64, 128, 256, 512]
for size in messageSizes:
let (_, privateKeys, publicKeys, delay, hops, _) = createDummyData()
var message = newSeq[byte](size)
randomize()
for i in 0 ..< size:
message[i] = byte(rand(256))
let paddedMessage = addPadding(message, MessageSize)
let surb =
createSURB(publicKeys, delay, hops, randomI()).expect("Create SURB error")
let packetBytes = useSURB(surb, Message(paddedMessage)).serialize()
check packetBytes.len == PacketSize
let packet = SphinxPacket.deserialize(packetBytes).expect("Sphinx wrap error")
let processedSP1 = processSphinxPacket(packet, privateKeys[0], tm).expect(
"Sphinx processing error"
)
check:
processedSP1.status == Intermediate
processedSP1.serializedSphinxPacket.len == PacketSize
let processedPacket1 = SphinxPacket
.deserialize(processedSP1.serializedSphinxPacket)
.expect("Sphinx wrap error")
let processedSP2 = processSphinxPacket(processedPacket1, privateKeys[1], tm)
.expect("Sphinx processing error")
check:
processedSP2.status == Intermediate
processedSP2.serializedSphinxPacket.len == PacketSize
let processedPacket2 = SphinxPacket
.deserialize(processedSP2.serializedSphinxPacket)
.expect("Sphinx wrap error")
let processedSP3 = processSphinxPacket(processedPacket2, privateKeys[2], tm)
.expect("Sphinx processing error")
check processedSP3.status == Reply
let msg = processReply(surb.key, surb.secret.get(), processedSP3.delta_prime)
.expect("Reply processing failed")
check paddedMessage == msg

View File

@@ -1,3 +1,3 @@
{.used.}
import testnative, ./pubsub/testpubsub, testinterop, testdaemon
import testnative, ./pubsub/testpubsub

View File

@@ -31,6 +31,8 @@ import ./helpers
when defined(linux) and defined(amd64):
{.used.}
import ../libp2p/utils/ipaddr
suite "AutoTLS Integration":
asyncTeardown:
checkTrackers()

View File

@@ -1,123 +0,0 @@
import chronos, unittest2, helpers
import
../libp2p/daemon/daemonapi,
../libp2p/multiaddress,
../libp2p/multicodec,
../libp2p/cid,
../libp2p/multihash,
../libp2p/peerid
when defined(nimHasUsed):
{.used.}
proc identitySpawnTest(): Future[bool] {.async.} =
var api = await newDaemonApi()
var data = await api.identity()
await api.close()
result = true
proc connectStreamTest(): Future[bool] {.async.} =
var api1 = await newDaemonApi()
var api2 = await newDaemonApi()
var id1 = await api1.identity()
var id2 = await api2.identity()
var protos = @["/test-stream"]
var test = "TEST STRING"
var testFuture = newFuture[string]("test.future")
proc streamHandler(
api: DaemonAPI, stream: P2PStream
) {.async: (raises: [CatchableError]).} =
var line = await stream.transp.readLine()
testFuture.complete(line)
await api2.addHandler(protos, streamHandler)
await api1.connect(id2.peer, id2.addresses)
# echo await api1.listPeers()
var stream = await api1.openStream(id2.peer, protos)
let sent = await stream.transp.write(test & "\r\n")
doAssert(sent == len(test) + 2)
doAssert((await wait(testFuture, 10.seconds)) == test)
await stream.close()
await api1.close()
await api2.close()
result = true
proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
var pubsubData = "TEST MESSAGE"
var msgData = cast[seq[byte]](pubsubData)
var api1, api2: DaemonAPI
api1 = await newDaemonApi(f)
api2 = await newDaemonApi(f)
var id1 = await api1.identity()
var id2 = await api2.identity()
var resultsCount = 0
var handlerFuture1 = newFuture[void]()
var handlerFuture2 = newFuture[void]()
proc pubsubHandler1(
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
): Future[bool] {.async: (raises: [CatchableError]).} =
let smsg = cast[string](message.data)
if smsg == pubsubData:
inc(resultsCount)
handlerFuture1.complete()
# Callback must return `false` to close subscription channel.
result = false
proc pubsubHandler2(
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
): Future[bool] {.async: (raises: [CatchableError]).} =
let smsg = cast[string](message.data)
if smsg == pubsubData:
inc(resultsCount)
handlerFuture2.complete()
# Callback must return `false` to close subscription channel.
result = false
await api1.connect(id2.peer, id2.addresses)
await api2.connect(id1.peer, id1.addresses)
var ticket1 = await api1.pubsubSubscribe("test-topic", pubsubHandler1)
var ticket2 = await api2.pubsubSubscribe("test-topic", pubsubHandler2)
await sleepAsync(1.seconds)
var topics1 = await api1.pubsubGetTopics()
var topics2 = await api2.pubsubGetTopics()
if len(topics1) == 1 and len(topics2) == 1:
var peers1 = await api1.pubsubListPeers("test-topic")
var peers2 = await api2.pubsubListPeers("test-topic")
if len(peers1) == 1 and len(peers2) == 1:
# Publish test data via api1.
await sleepAsync(250.milliseconds)
await api1.pubsubPublish("test-topic", msgData)
var res =
await one(allFutures(handlerFuture1, handlerFuture2), sleepAsync(5.seconds))
await api1.close()
await api2.close()
if resultsCount == 2:
result = true
suite "libp2p-daemon test suite":
test "Simple spawn and get identity test":
check:
waitFor(identitySpawnTest()) == true
test "Connect/Accept peer/stream test":
check:
waitFor(connectStreamTest()) == true
asyncTest "GossipSub test":
checkUntilTimeoutCustom(10.seconds, 100.milliseconds):
(await pubsubTest({PSGossipSub}))
asyncTest "FloodSub test":
checkUntilTimeoutCustom(10.seconds, 100.milliseconds):
(await pubsubTest({PSFloodSub}))

View File

@@ -1,58 +0,0 @@
{.used.}
import helpers, commoninterop
import ../libp2p
import ../libp2p/crypto/crypto, ../libp2p/protocols/connectivity/relay/relay
proc switchMplexCreator(
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
prov = proc(config: TransportConfig): Transport =
TcpTransport.new({}, config.upgr),
relay: Relay = Relay.new(circuitRelayV1 = true),
): Switch {.raises: [LPError].} =
SwitchBuilder
.new()
.withSignedPeerRecord(false)
.withMaxConnections(MaxConnections)
.withRng(crypto.newRng())
.withAddresses(@[ma])
.withMaxIn(-1)
.withMaxOut(-1)
.withTransport(prov)
.withMplex()
.withMaxConnsPerPeer(MaxConnectionsPerPeer)
.withPeerStore(capacity = 1000)
.withNoise()
.withCircuitRelay(relay)
.withNameResolver(nil)
.build()
proc switchYamuxCreator(
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
prov = proc(config: TransportConfig): Transport =
TcpTransport.new({}, config.upgr),
relay: Relay = Relay.new(circuitRelayV1 = true),
): Switch {.raises: [LPError].} =
SwitchBuilder
.new()
.withSignedPeerRecord(false)
.withMaxConnections(MaxConnections)
.withRng(crypto.newRng())
.withAddresses(@[ma])
.withMaxIn(-1)
.withMaxOut(-1)
.withTransport(prov)
.withYamux()
.withMaxConnsPerPeer(MaxConnectionsPerPeer)
.withPeerStore(capacity = 1000)
.withNoise()
.withCircuitRelay(relay)
.withNameResolver(nil)
.build()
suite "Tests interop":
commonInteropTests("mplex", switchMplexCreator)
relayInteropTests("mplex", switchMplexCreator)
commonInteropTests("yamux", switchYamuxCreator)
relayInteropTests("yamux", switchYamuxCreator)

View File

@@ -123,7 +123,7 @@ const
"/ip4/127.0.0.1/ipfs", "/ip4/127.0.0.1/ipfs/tcp", "/p2p-circuit/50",
]
PathVectors = ["/unix/tmp/p2pd.sock", "/unix/a/b/c/d/e/f/g/h/i.sock"]
PathVectors = ["/unix/a/b/c/d/e/f/g/h/i.sock"]
PathExpects = [
"90030E2F746D702F703270642E736F636B",

View File

@@ -45,5 +45,5 @@ when defined(libp2p_autotls_support):
import
mix/[
testcrypto, testcurve25519, testtagmanager, testseqnogenerator, testserialization,
testmixmessage, testsphinx, testmultiaddr,
testmixmessage, testsphinx, testmultiaddr, testfragmentation, testmixnode,
]

View File

@@ -28,7 +28,6 @@ import
builders,
upgrademngrs/upgrade,
varint,
daemon/daemonapi,
]
import ./helpers

View File

@@ -25,6 +25,7 @@ import
builders,
protocols/ping,
wire,
utils/ipaddr,
]
from ./helpers import suite, asyncTest, asyncTeardown, checkTrackers, skip, check