Compare commits

..

3 Commits

Author SHA1 Message Date
Diego
62846e773b Use Opt instead of Option 2022-09-21 18:04:50 +02:00
Diego
c4c8790aa6 Opt compilation error 2022-09-21 13:41:09 +02:00
Diego
160b7d3be6 Make observedAddr optional 2022-09-21 12:31:32 +02:00
46 changed files with 336 additions and 2001 deletions

View File

@@ -63,7 +63,7 @@ jobs:
git push origin gh-pages
update_site:
if: github.ref == 'refs/heads/master' || github.ref == 'refs/heads/docs'
if: github.ref == 'refs/heads/master'
name: 'Rebuild website'
runs-on: ubuntu-latest
steps:
@@ -74,12 +74,8 @@ jobs:
with:
python-version: 3.x
- uses: jiro4989/setup-nim-action@v1
with:
nim-version: 'stable'
- name: Generate website
run: pip install mkdocs-material && nimble website
run: pip install mkdocs-material && mkdocs build
- name: Clone the gh-pages branch
uses: actions/checkout@v2

1
.gitignore vendored
View File

@@ -13,6 +13,5 @@ build/
.vscode/
.DS_Store
tests/pubsub/testgossipsub
examples/*.md
nimble.develop
nimble.paths

18
.pinned
View File

@@ -1,16 +1,16 @@
bearssl;https://github.com/status-im/nim-bearssl@#f4c4233de453cb7eac0ce3f3ffad6496295f83ab
bearssl;https://github.com/status-im/nim-bearssl@#25009951ff8e0006171d566e3c7dc73a8231c2ed
chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a
chronos;https://github.com/status-im/nim-chronos@#9df76c39df254c7ff0cec6dec5c9f345f2819c91
dnsclient;https://github.com/ba0f3/dnsclient.nim@#6647ca8bd9ffcc13adaecb9cb6453032063967db
faststreams;https://github.com/status-im/nim-faststreams@#6112432b3a81d9db116cd5d64c39648881cfff29
httputils;https://github.com/status-im/nim-http-utils@#e88e231dfcef4585fe3b2fbd9b664dbd28a88040
chronos;https://github.com/status-im/nim-chronos@#41b82cdea34744148600b67a9154331b76181189
dnsclient;https://github.com/ba0f3/dnsclient.nim@#4960de2b345f567b12f09a08e9967af104ab39a3
faststreams;https://github.com/status-im/nim-faststreams@#49e2c52eb5dda46b1c9c10d079abe7bffe6cea89
httputils;https://github.com/status-im/nim-http-utils@#f83fbce4d6ec7927b75be3f85e4fa905fcb69788
json_serialization;https://github.com/status-im/nim-json-serialization@#e5b18fb710c3d0167ec79f3b892f5a7a1bc6d1a4
metrics;https://github.com/status-im/nim-metrics@#0a6477268e850d7bc98347b3875301524871765f
metrics;https://github.com/status-im/nim-metrics@#9070af9c830e93e5239ddc488cd65aa6f609ba73
nimcrypto;https://github.com/cheatfate/nimcrypto@#24e006df85927f64916e60511620583b11403178
secp256k1;https://github.com/status-im/nim-secp256k1@#c7f1a37d9b0f17292649bfed8bf6cef83cf4221f
secp256k1;https://github.com/status-im/nim-secp256k1@#5340cf188168d6afcafc8023770d880f067c0b2f
serialization;https://github.com/status-im/nim-serialization@#493d18b8292fc03aa4f835fd825dea1183f97466
stew;https://github.com/status-im/nim-stew@#f2e58ba4c8da65548c824e4fa8732db9739f6505
stew;https://github.com/status-im/nim-stew@#598246620da5c41d0e92a8dd6aab0755381b21cd
testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34
unittest2;https://github.com/status-im/nim-unittest2@#f180f596c88dfd266f746ed6f8dbebce39c824db
websock;https://github.com/status-im/nim-websock@#2424f2b215c0546f97d8b147e21544521c7545b0
websock;https://github.com/status-im/nim-websock@#8a72c0f7690802753b1d59887745b1ce1f0c8b3d
zlib;https://github.com/status-im/nim-zlib@#6a6670afba6b97b29b920340e2641978c05ab4d8

View File

@@ -2,5 +2,5 @@
Welcome to the nim-libp2p documentation!
Here, you'll find [tutorials](tutorial_1_connect.md) to help you get started, as well as
Here, you'll find [tutorials](tutorial_1_connect.md) to help you get started, as well as [examples](directchat.nim) and
the [full reference](https://status-im.github.io/nim-libp2p/master/libp2p.html).

View File

@@ -1,14 +1,6 @@
## # Circuit Relay example
##
## Circuit Relay can be used when a node cannot reach another node
## directly, but can reach it through a another node (the Relay).
##
## That may happen because of NAT, Firewalls, or incompatible transports.
##
## More informations [here](https://docs.libp2p.io/concepts/circuit-relay/).
import chronos, stew/byteutils
import libp2p,
libp2p/protocols/connectivity/relay/[relay, client]
import ../libp2p,
../libp2p/protocols/connectivity/relay/[relay, client]
# Helper to create a circuit relay node
proc createCircuitRelaySwitch(r: Relay): Switch =

View File

@@ -5,7 +5,7 @@ import
strformat, strutils,
stew/byteutils,
chronos,
libp2p
../libp2p
const DefaultAddr = "/ip4/127.0.0.1/tcp/0"

View File

@@ -1,6 +1,6 @@
import chronos # an efficient library for async
import stew/byteutils # various utils
import libp2p
import ../libp2p # when installed through nimble, just use `import libp2p`
##
# Create our custom protocol

View File

@@ -0,0 +1,108 @@
# Simple ping tutorial
Hi all, welcome to the first nim-libp2p tutorial!
!!! tips ""
This tutorial is for everyone who is interested in building peer-to-peer applications. No Nim programming experience is needed.
To give you a quick overview, **Nim** is the programming language we are using and **nim-libp2p** is the Nim implementation of [libp2p](https://libp2p.io/), a modular library that enables the development of peer-to-peer network applications.
Hope you'll find it helpful in your journey of learning. Happy coding! ;)
## Before you start
The only prerequisite here is [Nim](https://nim-lang.org/), the programming language with a Python-like syntax and a performance similar to C. Detailed information can be found [here](https://nim-lang.org/docs/tut1.html).
Install Nim via their [official website](https://nim-lang.org/install.html).
Check Nim's installation via `nim --version` and its package manager Nimble via `nimble --version`.
You can now install the latest version of `nim-libp2p`:
```bash
nimble install libp2p@#master
```
## A simple ping application
We'll start by creating a simple application, which is starting two libp2p [switch](https://docs.libp2p.io/concepts/stream-multiplexing/#switch-swarm), and pinging each other using the [Ping](https://docs.libp2p.io/concepts/protocols/#ping) protocol.
!!! tips ""
You can extract the code from this tutorial by running `nim c -r tools/markdown_runner.nim examples/tutorial_1_connect.md` in the libp2p folder!
Let's create a `part1.nim`, and import our dependencies:
```nim
import chronos
import libp2p
import libp2p/protocols/ping
```
[chronos](https://github.com/status-im/nim-chronos) the asynchronous framework used by `nim-libp2p`
Next, we'll create an helper procedure to create our switches. A switch needs a bit of configuration, and it will be easier to do this configuration only once:
```nim
proc createSwitch(ma: MultiAddress, rng: ref HmacDrbgContext): Switch =
var switch = SwitchBuilder
.new()
.withRng(rng) # Give the application RNG
.withAddress(ma) # Our local address(es)
.withTcpTransport() # Use TCP as transport
.withMplex() # Use Mplex as muxer
.withNoise() # Use Noise as secure manager
.build()
return switch
```
This will create a switch using [Mplex](https://docs.libp2p.io/concepts/stream-multiplexing/) as a multiplexer, Noise to secure the communication, and TCP as an underlying transport.
You can of course tweak this, to use a different or multiple transport, or tweak the configuration of Mplex and Noise, but this is some sane defaults that we'll use going forward.
Let's now start to create our main procedure:
```nim
proc main() {.async, gcsafe.} =
let
rng = newRng()
localAddress = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
pingProtocol = Ping.new(rng=rng)
```
We created some variables that we'll need for the rest of the application: the global `rng` instance, our `localAddress`, and an instance of the `Ping` protocol.
The address is in the [MultiAddress](https://github.com/multiformats/multiaddr) format. The port `0` means "take any port available".
`tryGet` is procedure which is part of [nim-result](https://github.com/arnetheduck/nim-result/), that will throw an exception if the supplied MultiAddress is invalid.
We can now create our two switches:
```nim
let
switch1 = createSwitch(localAddress, rng)
switch2 = createSwitch(localAddress, rng)
switch1.mount(pingProtocol)
await switch1.start()
await switch2.start()
```
We've **mounted** the `pingProtocol` on our first switch. This means that the first switch will actually listen for any ping requests coming in, and handle them accordingly.
Now that we've started the nodes, they are listening for incoming peers.
We can find out which port was attributed, and the resulting local addresses, by using `switch1.peerInfo.addrs`.
We'll **dial** the first switch from the second one, by specifying it's **Peer ID**, it's **MultiAddress** and the **`Ping` protocol codec**:
```nim
let conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, PingCodec)
```
We now have a `Ping` connection setup between the second and the first switch, we can use it to actually ping the node:
```nim
# ping the other node and echo the ping duration
echo "ping: ", await pingProtocol.ping(conn)
# We must close the connection ourselves when we're done with it
await conn.close()
```
And that's it! Just a little bit of cleanup: shutting down the switches, waiting for them to stop, and we'll call our `main` procedure:
```nim
await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports
waitFor(main())
```
You can now run this program using `nim c -r part1.nim`, and you should see the dialing sequence, ending with a ping output.
In the [next tutorial](tutorial_2_customproto.md), we'll look at how to create our own custom protocol.

View File

@@ -1,95 +0,0 @@
## # Simple ping tutorial
##
## Hi all, welcome to the first nim-libp2p tutorial!
##
## !!! tips ""
## This tutorial is for everyone who is interested in building peer-to-peer applications. No Nim programming experience is needed.
##
## To give you a quick overview, **Nim** is the programming language we are using and **nim-libp2p** is the Nim implementation of [libp2p](https://libp2p.io/), a modular library that enables the development of peer-to-peer network applications.
##
## Hope you'll find it helpful in your journey of learning. Happy coding! ;)
##
## ## Before you start
## The only prerequisite here is [Nim](https://nim-lang.org/), the programming language with a Python-like syntax and a performance similar to C. Detailed information can be found [here](https://nim-lang.org/docs/tut1.html).
##
## Install Nim via their [official website](https://nim-lang.org/install.html).
## Check Nim's installation via `nim --version` and its package manager Nimble via `nimble --version`.
##
## You can now install the latest version of `nim-libp2p`:
## ```bash
## nimble install libp2p@#master
## ```
##
## ## A simple ping application
## We'll start by creating a simple application, which is starting two libp2p [switch](https://docs.libp2p.io/concepts/stream-multiplexing/#switch-swarm), and pinging each other using the [Ping](https://docs.libp2p.io/concepts/protocols/#ping) protocol.
##
## !!! tips ""
## You can find the source of this tutorial (and other tutorials) in the [libp2p/examples](https://github.com/status-im/nim-libp2p/tree/master/examples) folder!
##
## Let's create a `part1.nim`, and import our dependencies:
import chronos
import libp2p
import libp2p/protocols/ping
## [chronos](https://github.com/status-im/nim-chronos) the asynchronous framework used by `nim-libp2p`
##
## Next, we'll create an helper procedure to create our switches. A switch needs a bit of configuration, and it will be easier to do this configuration only once:
proc createSwitch(ma: MultiAddress, rng: ref HmacDrbgContext): Switch =
var switch = SwitchBuilder
.new()
.withRng(rng) # Give the application RNG
.withAddress(ma) # Our local address(es)
.withTcpTransport() # Use TCP as transport
.withMplex() # Use Mplex as muxer
.withNoise() # Use Noise as secure manager
.build()
return switch
## This will create a switch using [Mplex](https://docs.libp2p.io/concepts/stream-multiplexing/) as a multiplexer, Noise to secure the communication, and TCP as an underlying transport.
##
## You can of course tweak this, to use a different or multiple transport, or tweak the configuration of Mplex and Noise, but this is some sane defaults that we'll use going forward.
##
##
## Let's now start to create our main procedure:
proc main() {.async, gcsafe.} =
let
rng = newRng()
localAddress = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
pingProtocol = Ping.new(rng=rng)
## We created some variables that we'll need for the rest of the application: the global `rng` instance, our `localAddress`, and an instance of the `Ping` protocol.
## The address is in the [MultiAddress](https://github.com/multiformats/multiaddr) format. The port `0` means "take any port available".
##
## `tryGet` is procedure which is part of [nim-result](https://github.com/arnetheduck/nim-result/), that will throw an exception if the supplied MultiAddress is invalid.
##
## We can now create our two switches:
let
switch1 = createSwitch(localAddress, rng)
switch2 = createSwitch(localAddress, rng)
switch1.mount(pingProtocol)
await switch1.start()
await switch2.start()
## We've **mounted** the `pingProtocol` on our first switch. This means that the first switch will actually listen for any ping requests coming in, and handle them accordingly.
##
## Now that we've started the nodes, they are listening for incoming peers.
## We can find out which port was attributed, and the resulting local addresses, by using `switch1.peerInfo.addrs`.
##
## We'll **dial** the first switch from the second one, by specifying it's **Peer ID**, it's **MultiAddress** and the **`Ping` protocol codec**:
let conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, PingCodec)
## We now have a `Ping` connection setup between the second and the first switch, we can use it to actually ping the node:
# ping the other node and echo the ping duration
echo "ping: ", await pingProtocol.ping(conn)
# We must close the connection ourselves when we're done with it
await conn.close()
## And that's it! Just a little bit of cleanup: shutting down the switches, waiting for them to stop, and we'll call our `main` procedure:
await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports
waitFor(main())
## You can now run this program using `nim c -r part1.nim`, and you should see the dialing sequence, ending with a ping output.
##
## In the [next tutorial](tutorial_2_customproto.md), we'll look at how to create our own custom protocol.

View File

@@ -0,0 +1,82 @@
# Custom protocol in libp2p
In the [previous tutorial](tutorial_1_connect.md), we've looked at how to create a simple ping program using the `nim-libp2p`.
We'll now look at how to create a custom protocol inside the libp2p
Let's create a `part2.nim`, and import our dependencies:
```nim
import chronos
import stew/byteutils
import libp2p
```
This is similar to the first tutorial, except we don't need to import the `Ping` protocol.
Next, we'll declare our custom protocol
```nim
const TestCodec = "/test/proto/1.0.0"
type TestProto = ref object of LPProtocol
```
We've set a [protocol ID](https://docs.libp2p.io/concepts/protocols/#protocol-ids), and created a custom `LPProtocol`. In a more complex protocol, we could use this structure to store interesting variables.
A protocol generally has two part: and handling/server part, and a dialing/client part.
Theses two parts can be identical, but in our trivial protocol, the server will wait for a message from the client, and the client will send a message, so we have to handle the two cases separately.
Let's start with the server part:
```nim
proc new(T: typedesc[TestProto]): T =
# every incoming connections will in be handled in this closure
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
# Read up to 1024 bytes from this connection, and transform them into
# a string
echo "Got from remote - ", string.fromBytes(await conn.readLp(1024))
# We must close the connections ourselves when we're done with it
await conn.close()
return T(codecs: @[TestCodec], handler: handle)
```
This is a constructor for our `TestProto`, that will specify our `codecs` and a `handler`, which will be called for each incoming peer asking for this protocol.
In our handle, we simply read a message from the connection and `echo` it.
We can now create our client part:
```nim
proc hello(p: TestProto, conn: Connection) {.async.} =
await conn.writeLp("Hello p2p!")
```
Again, pretty straight-forward, we just send a message on the connection.
We can now create our main procedure:
```nim
proc main() {.async, gcsafe.} =
let
rng = newRng()
testProto = TestProto.new()
switch1 = newStandardSwitch(rng=rng)
switch2 = newStandardSwitch(rng=rng)
switch1.mount(testProto)
await switch1.start()
await switch2.start()
let conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, TestCodec)
await testProto.hello(conn)
# We must close the connection ourselves when we're done with it
await conn.close()
await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports
```
This is very similar to the first tutorial's `main`, the only noteworthy difference is that we use `newStandardSwitch`, which is similar to the `createSwitch` of the first tutorial, but is bundled directly in libp2p
We can now wrap our program by calling our main proc:
```nim
waitFor(main())
```
And that's it!

View File

@@ -1,74 +0,0 @@
## # Custom protocol in libp2p
##
## In the [previous tutorial](tutorial_1_connect.md), we've looked at how to create a simple ping program using the `nim-libp2p`.
##
## We'll now look at how to create a custom protocol inside the libp2p
##
## Let's create a `part2.nim`, and import our dependencies:
import chronos
import stew/byteutils
import libp2p
## This is similar to the first tutorial, except we don't need to import the `Ping` protocol.
##
## Next, we'll declare our custom protocol
const TestCodec = "/test/proto/1.0.0"
type TestProto = ref object of LPProtocol
## We've set a [protocol ID](https://docs.libp2p.io/concepts/protocols/#protocol-ids), and created a custom `LPProtocol`. In a more complex protocol, we could use this structure to store interesting variables.
##
## A protocol generally has two part: and handling/server part, and a dialing/client part.
## Theses two parts can be identical, but in our trivial protocol, the server will wait for a message from the client, and the client will send a message, so we have to handle the two cases separately.
##
## Let's start with the server part:
proc new(T: typedesc[TestProto]): T =
# every incoming connections will in be handled in this closure
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
# Read up to 1024 bytes from this connection, and transform them into
# a string
echo "Got from remote - ", string.fromBytes(await conn.readLp(1024))
# We must close the connections ourselves when we're done with it
await conn.close()
return T(codecs: @[TestCodec], handler: handle)
## This is a constructor for our `TestProto`, that will specify our `codecs` and a `handler`, which will be called for each incoming peer asking for this protocol.
## In our handle, we simply read a message from the connection and `echo` it.
##
## We can now create our client part:
proc hello(p: TestProto, conn: Connection) {.async.} =
await conn.writeLp("Hello p2p!")
## Again, pretty straight-forward, we just send a message on the connection.
##
## We can now create our main procedure:
proc main() {.async, gcsafe.} =
let
rng = newRng()
testProto = TestProto.new()
switch1 = newStandardSwitch(rng=rng)
switch2 = newStandardSwitch(rng=rng)
switch1.mount(testProto)
await switch1.start()
await switch2.start()
let conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, TestCodec)
await testProto.hello(conn)
# We must close the connection ourselves when we're done with it
await conn.close()
await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports
## This is very similar to the first tutorial's `main`, the only noteworthy difference is that we use `newStandardSwitch`, which is similar to the `createSwitch` of the first tutorial, but is bundled directly in libp2p
##
## We can now wrap our program by calling our main proc:
waitFor(main())
## And that's it!
## In the [next tutorial](tutorial_3_protobuf.md), we'll create a more complex protocol using Protobuf.

View File

@@ -1,162 +0,0 @@
## # Protobuf usage
##
## In the [previous tutorial](tutorial_2_customproto.md), we created a simple "ping" protocol.
## Most real protocol want their messages to be structured and extensible, which is why
## most real protocols use [protobuf](https://developers.google.com/protocol-buffers) to
## define their message structures.
##
## Here, we'll create a slightly more complex protocol, which parses & generate protobuf
## messages. Let's start by importing our dependencies, as usual:
import chronos
import stew/results # for Opt[T]
import libp2p
## ## Protobuf encoding & decoding
## This will be the structure of our messages:
## ```protobuf
## message MetricList {
## message Metric {
## string name = 1;
## float value = 2;
## }
##
## repeated Metric metrics = 2;
## }
## ```
## We'll create our protobuf types, encoders & decoders, according to this format.
## To create the encoders & decoders, we are going to use minprotobuf
## (included in libp2p).
##
## While more modern technics
## (such as [nim-protobuf-serialization](https://github.com/status-im/nim-protobuf-serialization))
## exists, minprotobuf is currently the recommended method to handle protobuf, since it has
## been used in production extensively, and audited.
type
Metric = object
name: string
value: float
MetricList = object
metrics: seq[Metric]
{.push raises: [].}
proc encode(m: Metric): ProtoBuffer =
result = initProtoBuffer()
result.write(1, m.name)
result.write(2, m.value)
result.finish()
proc decode(_: type Metric, buf: seq[byte]): Result[Metric, ProtoError] =
var res: Metric
let pb = initProtoBuffer(buf)
# "getField" will return a Result[bool, ProtoError].
# The Result will hold an error if the protobuf is invalid.
# The Result will hold "false" if the field is missing
#
# We are just checking the error, and ignoring whether the value
# is present or not (default values are valid).
discard ? pb.getField(1, res.name)
discard ? pb.getField(2, res.value)
ok(res)
proc encode(m: MetricList): ProtoBuffer =
result = initProtoBuffer()
for metric in m.metrics:
result.write(1, metric.encode())
result.finish()
proc decode(_: type MetricList, buf: seq[byte]): Result[MetricList, ProtoError] =
var
res: MetricList
metrics: seq[seq[byte]]
let pb = initProtoBuffer(buf)
discard ? pb.getRepeatedField(1, metrics)
for metric in metrics:
res.metrics &= ? Metric.decode(metric)
ok(res)
## ## Results instead of exceptions
## As you can see, this part of the program also uses Results instead of exceptions for error handling.
## We start by `{.push raises: [].}`, which will prevent every non-async function from raising
## exceptions.
##
## Then, we use [nim-result](https://github.com/arnetheduck/nim-result) to convey
## errors to function callers. A `Result[T, E]` will either hold a valid result of type
## T, or an error of type E.
##
## You can check if the call succeeded by using `res.isOk`, and then get the
## value using `res.value` or the error by using `res.error`.
##
## Another useful tool is `?`, which will unpack a Result if it succeeded,
## or if it failed, exit the current procedure returning the error.
##
## nim-result is packed with other functionalities that you'll find in the
## nim-result repository.
##
## Results and exception are generally interchangeable, but have different semantics
## that you may or may not prefer.
##
## ## Creating the protocol
## We'll next create a protocol, like in the last tutorial, to request these metrics from our host
type
MetricCallback = proc: Future[MetricList] {.raises: [], gcsafe.}
MetricProto = ref object of LPProtocol
metricGetter: MetricCallback
proc new(_: typedesc[MetricProto], cb: MetricCallback): MetricProto =
let res = MetricProto(metricGetter: cb)
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
let
metrics = await res.metricGetter()
asProtobuf = metrics.encode()
await conn.writeLp(asProtobuf.buffer)
await conn.close()
res.codecs = @["/metric-getter/1.0.0"]
res.handler = handle
return res
proc fetch(p: MetricProto, conn: Connection): Future[MetricList] {.async.} =
let protobuf = await conn.readLp(2048)
# tryGet will raise an exception if the Result contains an error.
# It's useful to bridge between exception-world and result-world
return MetricList.decode(protobuf).tryGet()
## We can now create our main procedure:
proc main() {.async, gcsafe.} =
let rng = newRng()
proc randomMetricGenerator: Future[MetricList] {.async.} =
let metricCount = rng[].generate(uint32) mod 16
for i in 0 ..< metricCount + 1:
result.metrics.add(Metric(
name: "metric_" & $i,
value: float(rng[].generate(uint16)) / 1000.0
))
return result
let
metricProto1 = MetricProto.new(randomMetricGenerator)
metricProto2 = MetricProto.new(randomMetricGenerator)
switch1 = newStandardSwitch(rng=rng)
switch2 = newStandardSwitch(rng=rng)
switch1.mount(metricProto1)
await switch1.start()
await switch2.start()
let
conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, metricProto2.codecs)
metrics = await metricProto2.fetch(conn)
await conn.close()
for metric in metrics.metrics:
echo metric.name, " = ", metric.value
await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports
waitFor(main())
## If you run this program, you should see random metrics being sent from the switch1 to the switch2.

View File

@@ -1,167 +0,0 @@
## # Discovery method
##
## In the [previous tutorial](tutorial_4_gossipsub.md), we built a custom protocol using [protobuf](https://developers.google.com/protocol-buffers) and
## spread informations (some metrics) on the network using gossipsub.
## For this tutorial, on the other hand, we'll go back on a simple example
## we'll try to discover a specific peer to ping on the network.
##
## First, as usual, we import the dependencies:
import chronos
import stew/byteutils
import libp2p
import libp2p/protocols/rendezvous
import libp2p/discovery/rendezvousinterface
import libp2p/discovery/discoverymngr
## We'll not use newStandardSwitch this time as we need the discovery protocol
## [RendezVous](https://github.com/libp2p/specs/blob/master/rendezvous/README.md) to be mounted on the switch using withRendezVous.
##
## Note that other discovery methods such as [Kademlia](https://github.com/libp2p/specs/blob/master/kad-dht/README.md) or [discv5](https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md) exist (not everyone
## of those are implemented yet though), but to make it simple for this tutorial,
## we'll stick to RendezVous.
proc createSwitch(rdv: RendezVous = RendezVous.new()): Switch =
SwitchBuilder.new()
.withRng(newRng())
.withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ])
.withTcpTransport()
.withMplex()
.withNoise()
.withRendezVous(rdv)
.build()
const DumbCodec = "/dumb/proto/1.0.0"
type DumbProto = ref object of LPProtocol
proc new(T: typedesc[DumbProto], nodeNumber: int): T =
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
echo "Node", nodeNumber, " received: ", string.fromBytes(await conn.readLp(1024))
await conn.close()
return T(codecs: @[DumbCodec], handler: handle)
## We can create our main procedure:
proc main() {.async, gcsafe.} =
# First of all, we'll create and start a boot node that'll be
# used as an information hub on the network
let bootNode = createSwitch()
await bootNode.start()
var
switches: seq[Switch] = @[]
discManagers: seq[DiscoveryManager] = @[]
for i in 0..5:
# Create a RendezVous Protocol
let rdv = RendezVous.new()
# Create a switch, mount the RendezVous protocol to it, start it and
# eventually connect it to the bootNode
let switch = createSwitch(rdv)
switch.mount(DumbProto.new(i))
await switch.start()
await switch.connect(bootNode.peerInfo.peerId, bootNode.peerInfo.addrs)
switches.add(switch)
# A discovery manager is a simple tool, you set it up by adding discovery
# interfaces (such as RendezVousInterface) then you can use it to advertise
# something on the network or to request something from it.
let dm = DiscoveryManager()
# A RendezVousInterface is a RendezVous protocol wrapped to be usable by the
# DiscoveryManager. You can initialize a time to advertise/request (tta/ttr)
# for advertise/request every x seconds.
dm.add(RendezVousInterface.new(rdv))
# Each nodes of the network'll advertise on some topics (even gang or odd club)
if i mod 2 == 0:
dm.advertise(RdvNamespace("Even Gang"))
else:
dm.advertise(RdvNamespace("Odd Club"))
discManagers.add(dm)
let
rdv = RendezVous.new()
newcomer = createSwitch(rdv)
dm = DiscoveryManager()
await newcomer.start()
await newcomer.connect(bootNode.peerInfo.peerId, bootNode.peerInfo.addrs)
dm.add(RendezVousInterface.new(rdv, ttr = 250.milliseconds))
let query = dm.request(RdvNamespace("Odd Club"))
for _ in 0..2:
echo "start"
let
res = await query.getPeer()
echo "0 ", res[PeerId], " ", res.getAll(MultiAddress)
let
conn = await newcomer.dial(res[PeerId], res.getAll(MultiAddress), DumbCodec)
echo "1"
await conn.writeLp("Odd Club suuuucks! Even Gang is better!")
echo "2"
await sleepAsync(300.milliseconds)
await conn.close()
echo "stop"
query.stop()
# # Create a "network" of six nodes looking like that:
# # node0 <-> node1 <-> node2 <-> node3 <-> node4 <-> node5
# var
# switches: seq[Switch] = @[]
# discManagers: seq[DiscoveryManager] = @[]
# for _ in 0..5:
# # Create a RendezVous Protocol
# let rdv = RendezVous.new()
# # Create a switch, mount the RendezVous protocol to it, start it and
# # enventually connect it to the previous node in the sequence
# let switch = createSwitch(rdv)
# if switches.len == 0:
#
# await switch.start()
# if switches.len > 0:
# await switches[^1].connect(switch.peerInfo.peerId, switch.peerInfo.addrs)
# switches.add(switch)
#
# # A discovery manager is a simple tool, you setup it by adding discovery
# # interfaces (such as RendezVousInterface) then you can use it to advertise
# # something on the network or to request something from it.
# let dm = DiscoveryManager()
# # A RendezVousInterface is a RendezVous protocol wrapped to be usable by the
# # DiscoveryManager. You can initialize a time to advertise/request (tta/ttr)
# # for advertise/request every x seconds.
# dm.add(RendezVousInterface.new(rdv, ttr = 250.milliseconds))
# discManagers.add(dm)
#
# # The first node of the network advertise on someTopic
# discManagers[0].advertise(RdvNamespace("someTopic"))
# for i in 1..4:
# # All the four "middle" nodes request informations on someTopic
# # to make the message spread. With a time to request of 250ms, it should
# # spread on the four nodes in a little over 1 second.
# discard discManagers[i].request(RdvNamespace("someTopic"))
# let
# # The ending node request on someTopic aswell and await for information.
# query = discManagers[5].request(RdvNamespace("someTopic"))
# # getPeer give you a PeerAttribute containing informations about the peer.
# # The most useful are the PeerId and the MultiAddresses
# res = await query.getPeer()
#
# # Now that a peer have been found, we can stop the query
# query.stop()
#
# doAssert(res[PeerId] == switches[0].peerInfo.peerId)
# let resMa = res.getAll(MultiAddress)
# for ma in resMa:
# doAssert(ma in switches[0].peerInfo.addrs)
# echo "Peer Found: ", res[PeerId], " ", resMa
# # The peer Id and the address of the node0 is discovered by the node5 after
# # being pass by all the intermediate node.
#
# # The node5 can connect the node0 to talk about someTopic directly.
#
# # Stop all the DiscoveryManagers
# for dm in discManagers: dm.stop()
#
# # Stop all the switches
# let futStop = switches.mapIt(it.stop())
# await allFutures(futStop)
waitFor(main())

View File

@@ -9,7 +9,7 @@ skipDirs = @["tests", "examples", "Nim", "tools", "scripts", "docs"]
requires "nim >= 1.2.0",
"nimcrypto >= 0.4.1",
"dnsclient >= 0.3.0 & < 0.4.0",
"dnsclient >= 0.1.2",
"bearssl >= 0.1.4",
"chronicles >= 0.10.2",
"chronos >= 3.0.6",
@@ -32,16 +32,16 @@ proc runTest(filename: string, verify: bool = true, sign: bool = true,
rmFile "tests/" & filename.toExe
proc buildSample(filename: string, run = false) =
var excstr = "nim c --opt:speed --threads:on -d:debug --verbosity:0 --hints:off -p:. "
var excstr = "nim c --opt:speed --threads:on -d:debug --verbosity:0 --hints:off "
excstr.add(" examples/" & filename)
exec excstr
if run:
exec "./examples/" & filename.toExe
rmFile "examples/" & filename.toExe
proc tutorialToMd(filename: string) =
let markdown = gorge "cat " & filename & " | nim c -r --verbosity:0 --hints:off tools/markdown_builder.nim "
writeFile(filename.replace(".nim", ".md"), markdown)
proc buildTutorial(filename: string) =
discard gorge "cat " & filename & " | nim c -r --hints:off tools/markdown_runner.nim | " &
" nim --verbosity:0 --hints:off c -"
task testnative, "Runs libp2p native tests":
runTest("testnative")
@@ -86,24 +86,12 @@ task test_slim, "Runs the (slimmed down) test suite":
exec "nimble testfilter"
exec "nimble examples_build"
task website, "Build the website":
tutorialToMd("examples/tutorial_1_connect.nim")
tutorialToMd("examples/tutorial_2_customproto.nim")
tutorialToMd("examples/tutorial_3_protobuf.nim")
tutorialToMd("examples/tutorial_5_discovery.nim")
tutorialToMd("examples/circuitrelay.nim")
exec "mkdocs build"
task examples_build, "Build the samples":
buildSample("directchat")
buildSample("helloworld", true)
buildSample("circuitrelay", true)
buildSample("tutorial_1_connect", true)
buildSample("tutorial_2_customproto", true)
if (NimMajor, NimMinor) > (1, 2):
# This tutorial relies on post 1.4 exception tracking
buildSample("tutorial_3_protobuf", true)
buildSample("tutorial_5_discovery", true)
buildTutorial("examples/tutorial_1_connect.md")
buildTutorial("examples/tutorial_2_customproto.md")
# pin system
# while nimble lockfile

View File

@@ -26,7 +26,7 @@ import
switch, peerid, peerinfo, stream/connection, multiaddress,
crypto/crypto, transports/[transport, tcptransport],
muxers/[muxer, mplex/mplex, yamux/yamux],
protocols/[identify, secure/secure, secure/noise, rendezvous],
protocols/[identify, secure/secure, secure/noise],
protocols/connectivity/[autonat, relay/relay, relay/client, relay/rtransport],
connmanager, upgrademngrs/muxedupgrade,
nameresolving/nameresolver,
@@ -60,7 +60,6 @@ type
peerStoreCapacity: Option[int]
autonat: bool
circuitRelay: Relay
rdv: RendezVous
proc new*(T: type[SwitchBuilder]): T {.public.} =
## Creates a SwitchBuilder
@@ -195,10 +194,6 @@ proc withCircuitRelay*(b: SwitchBuilder, r: Relay = Relay.new()): SwitchBuilder
b.circuitRelay = r
b
proc withRendezVous*(b: SwitchBuilder, rdv: RendezVous = RendezVous.new()): SwitchBuilder =
b.rdv = rdv
b
proc build*(b: SwitchBuilder): Switch
{.raises: [Defect, LPError], public.} =
@@ -266,10 +261,6 @@ proc build*(b: SwitchBuilder): Switch
b.circuitRelay.setup(switch)
switch.mount(b.circuitRelay)
if not isNil(b.rdv):
b.rdv.setup(switch)
switch.mount(b.rdv)
return switch
proc newStandardSwitch*(

View File

@@ -13,13 +13,10 @@ else:
{.push raises: [].}
import chronos
import stew/results
import peerid,
stream/connection,
transports/transport
export results
type
Dial* = ref object of RootObj

View File

@@ -7,9 +7,8 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
import std/[sugar, tables, sequtils]
import std/[sugar, tables]
import stew/results
import pkg/[chronos,
chronicles,
metrics]
@@ -17,7 +16,6 @@ import pkg/[chronos,
import dial,
peerid,
peerinfo,
multicodec,
multistream,
connmanager,
stream/connection,
@@ -26,7 +24,7 @@ import dial,
upgrademngrs/upgrade,
errors
export dial, errors, results
export dial, errors
logScope:
topics = "libp2p dialer"
@@ -47,105 +45,56 @@ type
transports: seq[Transport]
nameResolver: NameResolver
proc dialAndUpgrade(
self: Dialer,
peerId: Opt[PeerId],
hostname: string,
address: MultiAddress):
Future[Connection] {.async.} =
for transport in self.transports: # for each transport
if transport.handles(address): # check if it can dial it
trace "Dialing address", address, peerId, hostname
let dialed =
try:
libp2p_total_dial_attempts.inc()
await transport.dial(hostname, address)
except CancelledError as exc:
debug "Dialing canceled", msg = exc.msg, peerId
raise exc
except CatchableError as exc:
debug "Dialing failed", msg = exc.msg, peerId
libp2p_failed_dials.inc()
return nil # Try the next address
# also keep track of the connection's bottom unsafe transport direction
# required by gossipsub scoring
dialed.transportDir = Direction.Out
libp2p_successful_dials.inc()
let conn =
try:
await transport.upgradeOutgoing(dialed, peerId)
except CatchableError as exc:
# If we failed to establish the connection through one transport,
# we won't succeeded through another - no use in trying again
await dialed.close()
debug "Upgrade failed", msg = exc.msg, peerId
if exc isnot CancelledError:
libp2p_failed_upgrades_outgoing.inc()
# Try other address
return nil
doAssert not isNil(conn), "connection died after upgradeOutgoing"
debug "Dial successful", conn, peerId = conn.peerId
return conn
return nil
proc expandDnsAddr(
self: Dialer,
peerId: Opt[PeerId],
address: MultiAddress): Future[seq[(MultiAddress, Opt[PeerId])]] {.async.} =
if not DNSADDR.matchPartial(address): return @[(address, peerId)]
if isNil(self.nameResolver):
info "Can't resolve DNSADDR without NameResolver", ma=address
return @[]
let
toResolve =
if peerId.isSome:
address & MultiAddress.init(multiCodec("p2p"), peerId.tryGet()).tryGet()
else:
address
resolved = await self.nameResolver.resolveDnsAddr(toResolve)
for resolvedAddress in resolved:
let lastPart = resolvedAddress[^1].tryGet()
if lastPart.protoCode == Result[MultiCodec, string].ok(multiCodec("p2p")):
let
peerIdBytes = lastPart.protoArgument().tryGet()
addrPeerId = PeerId.init(peerIdBytes).tryGet()
result.add((resolvedAddress[0..^2].tryGet(), Opt.some(addrPeerId)))
else:
result.add((resolvedAddress, peerId))
proc dialAndUpgrade(
self: Dialer,
peerId: Opt[PeerId],
addrs: seq[MultiAddress]):
Future[Connection] {.async.} =
debug "Dialing peer", peerId
for rawAddress in addrs:
# resolve potential dnsaddr
let addresses = await self.expandDnsAddr(peerId, rawAddress)
for address in addrs: # for each address
let
hostname = address.getHostname()
resolvedAddresses =
if isNil(self.nameResolver): @[address]
else: await self.nameResolver.resolveMAddress(address)
for (expandedAddress, addrPeerId) in addresses:
# DNS resolution
let
hostname = expandedAddress.getHostname()
resolvedAddresses =
if isNil(self.nameResolver): @[expandedAddress]
else: await self.nameResolver.resolveMAddress(expandedAddress)
for a in resolvedAddresses: # for each resolved address
for transport in self.transports: # for each transport
if transport.handles(a): # check if it can dial it
trace "Dialing address", address = $a, peerId, hostname
let dialed = try:
libp2p_total_dial_attempts.inc()
await transport.dial(hostname, a)
except CancelledError as exc:
debug "Dialing canceled", msg = exc.msg, peerId
raise exc
except CatchableError as exc:
debug "Dialing failed", msg = exc.msg, peerId
libp2p_failed_dials.inc()
continue # Try the next address
for resolvedAddress in resolvedAddresses:
result = await self.dialAndUpgrade(addrPeerId, hostname, resolvedAddress)
if not isNil(result):
return result
# also keep track of the connection's bottom unsafe transport direction
# required by gossipsub scoring
dialed.transportDir = Direction.Out
libp2p_successful_dials.inc()
let conn = try:
await transport.upgradeOutgoing(dialed, peerId)
except CatchableError as exc:
# If we failed to establish the connection through one transport,
# we won't succeeded through another - no use in trying again
# TODO we should try another address though
await dialed.close()
debug "Upgrade failed", msg = exc.msg, peerId
if exc isnot CancelledError:
libp2p_failed_upgrades_outgoing.inc()
raise exc
doAssert not isNil(conn), "connection died after upgradeOutgoing"
debug "Dial successful", conn, peerId = conn.peerId
return conn
proc internalConnect(
self: Dialer,

View File

@@ -1,163 +0,0 @@
# Nim-LibP2P
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import std/sequtils
import chronos, chronicles, stew/results
import ../errors
type
BaseAttr = ref object of RootObj
comparator: proc(f, c: BaseAttr): bool {.gcsafe, raises: [Defect].}
Attribute[T] = ref object of BaseAttr
value: T
PeerAttributes* = object
attributes: seq[BaseAttr]
DiscoveryService* = distinct string
proc `==`*(a, b: DiscoveryService): bool {.borrow.}
proc ofType*[T](f: BaseAttr, _: type[T]): bool =
return f of Attribute[T]
proc to*[T](f: BaseAttr, _: type[T]): T =
Attribute[T](f).value
proc add*[T](pa: var PeerAttributes,
value: T) =
pa.attributes.add(Attribute[T](
value: value,
comparator: proc(f: BaseAttr, c: BaseAttr): bool =
f.ofType(T) and c.ofType(T) and f.to(T) == c.to(T)
)
)
iterator items*(pa: PeerAttributes): BaseAttr =
for f in pa.attributes:
yield f
proc getAll*[T](pa: PeerAttributes, t: typedesc[T]): seq[T] =
for f in pa.attributes:
if f.ofType(T):
result.add(f.to(T))
proc `{}`*[T](pa: PeerAttributes, t: typedesc[T]): Opt[T] =
for f in pa.attributes:
if f.ofType(T):
return Opt.some(f.to(T))
Opt.none(T)
proc `[]`*[T](pa: PeerAttributes, t: typedesc[T]): T {.raises: [Defect, KeyError].} =
pa{T}.valueOr: raise newException(KeyError, "Attritute not found")
proc match*(pa, candidate: PeerAttributes): bool =
for f in pa.attributes:
block oneAttribute:
for field in candidate.attributes:
if field.comparator(field, f):
break oneAttribute
return false
return true
type
PeerFoundCallback* = proc(pa: PeerAttributes) {.raises: [Defect], gcsafe.}
DiscoveryInterface* = ref object of RootObj
onPeerFound*: PeerFoundCallback
toAdvertise*: PeerAttributes
advertisementUpdated*: AsyncEvent
advertiseLoop*: Future[void]
method request*(self: DiscoveryInterface, pa: PeerAttributes) {.async, base.} =
doAssert(false, "Not implemented!")
method advertise*(self: DiscoveryInterface) {.async, base.} =
doAssert(false, "Not implemented!")
type
DiscoveryError* = object of LPError
DiscoveryQuery* = ref object
attr: PeerAttributes
peers: AsyncQueue[PeerAttributes]
futs: seq[Future[void]]
DiscoveryManager* = ref object
interfaces: seq[DiscoveryInterface]
queries: seq[DiscoveryQuery]
proc add*(dm: DiscoveryManager, di: DiscoveryInterface) =
dm.interfaces &= di
di.onPeerFound = proc (pa: PeerAttributes) =
for query in dm.queries:
if query.attr.match(pa):
try:
query.peers.putNoWait(pa)
except AsyncQueueFullError as exc:
debug "Cannot push discovered peer to queue"
proc request*(dm: DiscoveryManager, pa: PeerAttributes): DiscoveryQuery =
var query = DiscoveryQuery(attr: pa, peers: newAsyncQueue[PeerAttributes]())
for i in dm.interfaces:
query.futs.add(i.request(pa))
dm.queries.add(query)
dm.queries.keepItIf(it.futs.anyIt(not it.finished()))
return query
proc request*[T](dm: DiscoveryManager, value: T): DiscoveryQuery =
var pa: PeerAttributes
pa.add(value)
return dm.request(pa)
proc advertise*(dm: DiscoveryManager, pa: PeerAttributes) =
for i in dm.interfaces:
i.toAdvertise = pa
if i.advertiseLoop.isNil:
i.advertisementUpdated = newAsyncEvent()
i.advertiseLoop = i.advertise()
else:
i.advertisementUpdated.fire()
proc advertise*[T](dm: DiscoveryManager, value: T) =
var pa: PeerAttributes
pa.add(value)
dm.advertise(pa)
proc stop*(query: DiscoveryQuery) =
for r in query.futs:
if not r.finished(): r.cancel()
proc stop*(dm: DiscoveryManager) =
for q in dm.queries:
q.stop()
for i in dm.interfaces:
if isNil(i.advertiseLoop): continue
i.advertiseLoop.cancel()
proc getPeer*(query: DiscoveryQuery): Future[PeerAttributes] {.async.} =
let getter = query.peers.popFirst()
try:
await getter or allFinished(query.futs)
except CancelledError as exc:
getter.cancel()
raise exc
if not finished(getter):
# discovery loops only finish when they don't handle the query
raise newException(DiscoveryError, "Unable to find any peer matching this request")
return await getter

View File

@@ -1,77 +0,0 @@
# Nim-LibP2P
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import sequtils
import chronos
import ./discoverymngr,
../protocols/rendezvous,
../peerid
type
RendezVousInterface* = ref object of DiscoveryInterface
rdv*: RendezVous
timeToRequest: Duration
timeToAdvertise: Duration
RdvNamespace* = distinct string
proc `==`*(a, b: RdvNamespace): bool {.borrow.}
method request*(self: RendezVousInterface, pa: PeerAttributes) {.async.} =
var namespace = ""
for attr in pa:
if attr.ofType(RdvNamespace):
namespace = string attr.to(RdvNamespace)
elif attr.ofType(DiscoveryService):
namespace = string attr.to(DiscoveryService)
elif attr.ofType(PeerId):
namespace = $attr.to(PeerId)
else:
# unhandled type
return
while true:
for pr in await self.rdv.request(namespace):
var peer: PeerAttributes
peer.add(pr.peerId)
for address in pr.addresses:
peer.add(address.address)
peer.add(DiscoveryService(namespace))
peer.add(RdvNamespace(namespace))
self.onPeerFound(peer)
await sleepAsync(self.timeToRequest)
method advertise*(self: RendezVousInterface) {.async.} =
while true:
var toAdvertise: seq[string]
for attr in self.toAdvertise:
if attr.ofType(RdvNamespace):
toAdvertise.add string attr.to(RdvNamespace)
elif attr.ofType(DiscoveryService):
toAdvertise.add string attr.to(DiscoveryService)
elif attr.ofType(PeerId):
toAdvertise.add $attr.to(PeerId)
self.advertisementUpdated.clear()
for toAdv in toAdvertise:
await self.rdv.advertise(toAdv, self.timeToAdvertise)
await sleepAsync(self.timeToAdvertise) or self.advertisementUpdated.wait()
proc new*(T: typedesc[RendezVousInterface],
rdv: RendezVous,
ttr: Duration = 1.minutes,
tta: Duration = MinimumDuration): RendezVousInterface =
T(rdv: rdv, timeToRequest: ttr, timeToAdvertise: tta)

View File

@@ -573,7 +573,7 @@ proc protoArgument*(ma: MultiAddress,
err("multiaddress: Decoding protocol error")
else:
ok(res)
elif proto.kind in {MAKind.Length, Path}:
elif proto.kind in {Length, Path}:
if vb.data.readSeq(buffer) == -1:
err("multiaddress: Decoding protocol error")
else:
@@ -594,13 +594,6 @@ proc protoAddress*(ma: MultiAddress): MaResult[seq[byte]] =
buffer.setLen(res)
ok(buffer)
proc protoArgument*(ma: MultiAddress): MaResult[seq[byte]] =
## Returns MultiAddress ``ma`` protocol address binary blob.
##
## If current MultiAddress do not have argument value, then result array will
## be empty.
ma.protoAddress()
proc getPart(ma: MultiAddress, index: int): MaResult[MultiAddress] =
var header: uint64
var data = newSeq[byte]()
@@ -608,9 +601,6 @@ proc getPart(ma: MultiAddress, index: int): MaResult[MultiAddress] =
var vb = ma
var res: MultiAddress
res.data = initVBuffer()
if index < 0: return err("multiaddress: negative index gived to getPart")
while offset <= index:
if vb.data.readVarint(header) == -1:
return err("multiaddress: Malformed binary address!")
@@ -628,7 +618,7 @@ proc getPart(ma: MultiAddress, index: int): MaResult[MultiAddress] =
res.data.writeVarint(header)
res.data.writeArray(data)
res.data.finish()
elif proto.kind in {MAKind.Length, Path}:
elif proto.kind in {Length, Path}:
if vb.data.readSeq(data) == -1:
return err("multiaddress: Decoding protocol error")
@@ -657,13 +647,9 @@ proc getParts[U, V](ma: MultiAddress, slice: HSlice[U, V]): MaResult[MultiAddres
? res.append(? ma[i])
ok(res)
proc `[]`*(ma: MultiAddress, i: int | BackwardsIndex): MaResult[MultiAddress] {.inline.} =
proc `[]`*(ma: MultiAddress, i: int): MaResult[MultiAddress] {.inline.} =
## Returns part with index ``i`` of MultiAddress ``ma``.
when i is BackwardsIndex:
let maLength = ? len(ma)
ma.getPart(maLength - int(i))
else:
ma.getPart(i)
ma.getPart(i)
proc `[]`*(ma: MultiAddress, slice: HSlice): MaResult[MultiAddress] {.inline.} =
## Returns parts with slice ``slice`` of MultiAddress ``ma``.
@@ -694,7 +680,7 @@ iterator items*(ma: MultiAddress): MaResult[MultiAddress] =
res.data.writeVarint(header)
res.data.writeArray(data)
elif proto.kind in {MAKind.Length, Path}:
elif proto.kind in {Length, Path}:
if vb.data.readSeq(data) == -1:
yield err(MaResult[MultiAddress], "Decoding protocol error")

View File

@@ -71,36 +71,25 @@ proc select*(m: MultistreamSelect,
else:
trace "multistream handshake success", conn
echo " 0 ", proto
if proto.len() == 0: # no protocols, must be a handshake call
return Codec
else:
echo " 1"
s = string.fromBytes(await conn.readLp(MsgSize)) # read the first proto
echo " 2"
validateSuffix(s)
echo " 3"
trace "reading first requested proto", conn
if s == proto[0]:
echo " 4.1"
trace "successfully selected ", conn, proto = proto[0]
conn.protocol = proto[0]
echo " 4.2 ", proto[0]
return proto[0]
elif proto.len > 1:
echo " 5.1"
# Try to negotiate alternatives
let protos = proto[1..<proto.len()]
trace "selecting one of several protos", conn, protos = protos
echo " 5.2"
for p in protos:
echo " 5.2.1"
trace "selecting proto", conn, proto = p
await conn.writeLp((p & "\n")) # select proto
echo " 5.2.2"
s = string.fromBytes(await conn.readLp(MsgSize)) # read the first proto
validateSuffix(s)
echo " 5.2.3"
if s == p:
trace "selected protocol", conn, protocol = s
conn.protocol = s
@@ -113,7 +102,6 @@ proc select*(m: MultistreamSelect,
proc select*(m: MultistreamSelect,
conn: Connection,
proto: string): Future[bool] {.async.} =
echo proto
if proto.len > 0:
return (await m.select(conn, @[proto])) == proto
else:

View File

@@ -14,7 +14,7 @@ else:
import
std/[streams, strutils, sets, sequtils],
chronos, chronicles, stew/byteutils,
chronos, chronicles,
dnsclientpkg/[protocol, types]
import
@@ -76,11 +76,15 @@ proc getDnsResponse(
if not receivedDataFuture.finished:
raise newException(IOError, "DNS server timeout")
let rawResponse = sock.getMessage()
var
rawResponse = sock.getMessage()
dataStream = newStringStream()
dataStream.writeData(addr rawResponse[0], rawResponse.len)
dataStream.setPosition(0)
# parseResponse can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
return parseResponse(string.fromBytes(rawResponse))
return parseResponse(dataStream)
except CatchableError as exc: raise exc
except Exception as exc: raiseAssert exc.msg
finally:
@@ -114,14 +118,7 @@ method resolveIp*(
try:
let resp = await fut
for answer in resp.answers:
# toString can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
resolvedAddresses.incl(
try: answer.toString()
except CatchableError as exc: raise exc
except Exception as exc: raiseAssert exc.msg
)
resolvedAddresses.incl(answer.toString())
except CancelledError as e:
raise e
except ValueError as e:
@@ -161,11 +158,6 @@ method resolveTxt*(
self.nameServers.add(self.nameServers[0])
self.nameServers.delete(0)
continue
except Exception as e:
# toString can has a raises: [Exception, ..] because of
# https://github.com/nim-lang/Nim/commit/035134de429b5d99c5607c5fae912762bebb6008
# it can't actually raise though
raiseAssert e.msg
debug "Failed to resolve TXT, returning empty set"
return @[]

View File

@@ -13,7 +13,7 @@ else:
{.push raises: [].}
import std/[sugar, sets, sequtils, strutils]
import
import
chronos,
chronicles,
stew/[endians2, byteutils]
@@ -22,14 +22,14 @@ import ".."/[multiaddress, multicodec]
logScope:
topics = "libp2p nameresolver"
type
type
NameResolver* = ref object of RootObj
method resolveTxt*(
self: NameResolver,
address: string): Future[seq[string]] {.async, base.} =
## Get TXT record
##
##
doAssert(false, "Not implemented!")
@@ -39,18 +39,16 @@ method resolveIp*(
port: Port,
domain: Domain = Domain.AF_UNSPEC): Future[seq[TransportAddress]] {.async, base.} =
## Resolve the specified address
##
##
doAssert(false, "Not implemented!")
proc getHostname*(ma: MultiAddress): string =
let
firstPart = ma[0].valueOr: return ""
fpSplitted = ($firstPart).split('/', 2)
if fpSplitted.len > 2: fpSplitted[2]
let firstPart = ($ma[0].get()).split('/')
if firstPart.len > 1: firstPart[2]
else: ""
proc resolveOneAddress(
proc resolveDnsAddress(
self: NameResolver,
ma: MultiAddress,
domain: Domain = Domain.AF_UNSPEC,
@@ -66,22 +64,29 @@ proc resolveOneAddress(
let
port = Port(fromBytesBE(uint16, pbuf))
resolvedAddresses = await self.resolveIp(prefix & dnsval, port, domain)
return collect(newSeqOfCap(4)):
for address in resolvedAddresses:
var createdAddress = MultiAddress.init(address).tryGet()[0].tryGet()
for part in ma:
if DNS.match(part.tryGet()): continue
if DNS.match(part.get()): continue
createdAddress &= part.tryGet()
createdAddress
proc resolveDnsAddr*(
func matchDnsSuffix(m1, m2: MultiAddress): MaResult[bool] =
for partMaybe in m1:
let part = ?partMaybe
if DNS.match(part): continue
let entryProt = ?m2[?part.protoCode()]
if entryProt != part:
return ok(false)
return ok(true)
proc resolveDnsAddr(
self: NameResolver,
ma: MultiAddress,
depth: int = 0): Future[seq[MultiAddress]] {.async.} =
if not DNSADDR.matchPartial(ma):
return @[ma]
depth: int = 0): Future[seq[MultiAddress]]
{.async.} =
trace "Resolving dnsaddr", ma
if depth > 6:
@@ -99,17 +104,21 @@ proc resolveDnsAddr*(
if not entry.startsWith("dnsaddr="): continue
let entryValue = MultiAddress.init(entry[8..^1]).tryGet()
if entryValue.contains(multiCodec("p2p")).tryGet() and ma.contains(multiCodec("p2p")).tryGet():
if entryValue[multiCodec("p2p")] != ma[multiCodec("p2p")]:
continue
if not matchDnsSuffix(ma, entryValue).tryGet(): continue
let resolved = await self.resolveDnsAddr(entryValue, depth + 1)
for r in resolved:
result.add(r)
# The spec is not clear wheter only DNSADDR can be recursived
# or any DNS addr. Only handling DNSADDR because it's simpler
# to avoid infinite recursion
if DNSADDR.matchPartial(entryValue):
let resolved = await self.resolveDnsAddr(entryValue, depth + 1)
for r in resolved:
result.add(r)
else:
result.add(entryValue)
if result.len == 0:
debug "Failed to resolve a DNSADDR", ma
return @[]
debug "Failed to resolve any DNSADDR", ma
return @[ma]
return result
@@ -124,15 +133,14 @@ proc resolveMAddress*(
let code = address[0].get().protoCode().get()
let seq = case code:
of multiCodec("dns"):
await self.resolveOneAddress(address)
await self.resolveDnsAddress(address)
of multiCodec("dns4"):
await self.resolveOneAddress(address, Domain.AF_INET)
await self.resolveDnsAddress(address, Domain.AF_INET)
of multiCodec("dns6"):
await self.resolveOneAddress(address, Domain.AF_INET6)
await self.resolveDnsAddress(address, Domain.AF_INET6)
of multiCodec("dnsaddr"):
await self.resolveDnsAddr(address)
else:
doAssert false
@[address]
for ad in seq:
res.incl(ad)

View File

@@ -13,7 +13,6 @@ else:
{.push raises: [].}
import std/[options, sets, sequtils]
import stew/results
import chronos, chronicles, stew/objects
import ../protocol,
../../switch,
@@ -245,9 +244,11 @@ proc handleDial(a: Autonat, conn: Connection, msg: AutonatMsg): Future[void] =
if peerInfo.id.isSome() and peerInfo.id.get() != conn.peerId:
return conn.sendResponseError(BadRequest, "PeerId mismatch")
if conn.observedAddr.isNone:
return conn.sendResponseError(BadRequest, "Missing observed address")
let observedAddr = conn.observedAddr.get()
let observedAddr =
if conn.observedAddr.isNone:
return conn.sendResponseError(BadRequest, "Missing observed address")
else:
conn.observedAddr.get()
var isRelayed = observedAddr.contains(multiCodec("p2p-circuit"))
if isRelayed.isErr() or isRelayed.get():

View File

@@ -64,17 +64,15 @@ proc bridge*(connSrc: Connection, connDst: Connection) {.async.} =
await futSrc or futDst
if futSrc.finished():
bufRead = await futSrc
if bufRead > 0:
bytesSendFromSrcToDst.inc(bufRead)
await connDst.write(@bufSrcToDst[0..<bufRead])
zeroMem(addr(bufSrcToDst), bufSrcToDst.high + 1)
bytesSendFromSrcToDst.inc(bufRead)
await connDst.write(@bufSrcToDst[0..<bufRead])
zeroMem(addr(bufSrcToDst), bufSrcToDst.high + 1)
futSrc = connSrc.readOnce(addr bufSrcToDst[0], bufSrcToDst.high + 1)
if futDst.finished():
bufRead = await futDst
if bufRead > 0:
bytesSendFromDstToSrc += bufRead
await connSrc.write(bufDstToSrc[0..<bufRead])
zeroMem(addr(bufDstToSrc), bufDstToSrc.high + 1)
bytesSendFromDstToSrc += bufRead
await connSrc.write(bufDstToSrc[0..<bufRead])
zeroMem(addr(bufDstToSrc), bufDstToSrc.high + 1)
futDst = connDst.readOnce(addr bufDstToSrc[0], bufDstToSrc.high + 1)
except CancelledError as exc:
raise exc

View File

@@ -16,7 +16,6 @@ else:
{.push raises: [].}
import std/[sequtils, options, strutils, sugar]
import stew/results
import chronos, chronicles
import ../protobuf/minprotobuf,
../peerinfo,

View File

@@ -12,8 +12,7 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import std/[sequtils, strutils, tables, hashes, options]
import stew/results
import std/[sequtils, strutils, tables, hashes]
import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf],
../../peerid,

View File

@@ -1,675 +0,0 @@
# Nim-LibP2P
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import tables, sequtils, sugar, sets, options
import chronos,
chronicles,
bearssl/rand,
stew/[byteutils, objects]
import ./protocol,
../switch,
../routing_record,
../utils/heartbeat,
../stream/connection,
../utils/offsettedseq,
../utils/semaphore
export chronicles
logScope:
topics = "libp2p discovery rendezvous"
const
RendezVousCodec* = "/rendezvous/1.0.0"
MinimumDuration* = 2.hours
MaximumDuration = 72.hours
MinimumTTL = MinimumDuration.seconds.uint64
MaximumTTL = MaximumDuration.seconds.uint64
RegistrationLimitPerPeer = 1000
DiscoverLimit = 1000'u64
SemaphoreDefaultSize = 5
type
MessageType {.pure.} = enum
Register = 0
RegisterResponse = 1
Unregister = 2
Discover = 3
DiscoverResponse = 4
ResponseStatus = enum
Ok = 0
InvalidNamespace = 100
InvalidSignedPeerRecord = 101
InvalidTTL = 102
InvalidCookie = 103
NotAuthorized = 200
InternalError = 300
Unavailable = 400
Cookie = object
offset : uint64
ns : string
Register = object
ns : string
signedPeerRecord: seq[byte]
ttl: Option[uint64] # in seconds
RegisterResponse = object
status: ResponseStatus
text: Option[string]
ttl: Option[uint64] # in seconds
Unregister = object
ns: string
Discover = object
ns: string
limit: Option[uint64]
cookie: Option[seq[byte]]
DiscoverResponse = object
registrations: seq[Register]
cookie: Option[seq[byte]]
status: ResponseStatus
text: Option[string]
Message = object
msgType: MessageType
register: Option[Register]
registerResponse: Option[RegisterResponse]
unregister: Option[Unregister]
discover: Option[Discover]
discoverResponse: Option[DiscoverResponse]
proc encode(c: Cookie): ProtoBuffer =
result = initProtoBuffer()
result.write(1, c.offset)
result.write(2, c.ns)
result.finish()
proc encode(r: Register): ProtoBuffer =
result = initProtoBuffer()
result.write(1, r.ns)
result.write(2, r.signedPeerRecord)
if r.ttl.isSome():
result.write(3, r.ttl.get())
result.finish()
proc encode(rr: RegisterResponse): ProtoBuffer =
result = initProtoBuffer()
result.write(1, rr.status.uint)
if rr.text.isSome():
result.write(2, rr.text.get())
if rr.ttl.isSome():
result.write(3, rr.ttl.get())
result.finish()
proc encode(u: Unregister): ProtoBuffer =
result = initProtoBuffer()
result.write(1, u.ns)
result.finish()
proc encode(d: Discover): ProtoBuffer =
result = initProtoBuffer()
result.write(1, d.ns)
if d.limit.isSome():
result.write(2, d.limit.get())
if d.cookie.isSome():
result.write(3, d.cookie.get())
result.finish()
proc encode(d: DiscoverResponse): ProtoBuffer =
result = initProtoBuffer()
for reg in d.registrations:
result.write(1, reg.encode())
if d.cookie.isSome():
result.write(2, d.cookie.get())
result.write(3, d.status.uint)
if d.text.isSome():
result.write(4, d.text.get())
result.finish()
proc encode(msg: Message): ProtoBuffer =
result = initProtoBuffer()
result.write(1, msg.msgType.uint)
if msg.register.isSome():
result.write(2, msg.register.get().encode())
if msg.registerResponse.isSome():
result.write(3, msg.registerResponse.get().encode())
if msg.unregister.isSome():
result.write(4, msg.unregister.get().encode())
if msg.discover.isSome():
result.write(5, msg.discover.get().encode())
if msg.discoverResponse.isSome():
result.write(6, msg.discoverResponse.get().encode())
result.finish()
proc decode(_: typedesc[Cookie], buf: seq[byte]): Option[Cookie] =
var c: Cookie
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, c.offset)
r2 = pb.getRequiredField(2, c.ns)
if r1.isErr() or r2.isErr(): return none(Cookie)
some(c)
proc decode(_: typedesc[Register], buf: seq[byte]): Option[Register] =
var
r: Register
ttl: uint64
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, r.ns)
r2 = pb.getRequiredField(2, r.signedPeerRecord)
r3 = pb.getField(3, ttl)
if r1.isErr() or r2.isErr() or r3.isErr(): return none(Register)
if r3.get(): r.ttl = some(ttl)
some(r)
proc decode(_: typedesc[RegisterResponse], buf: seq[byte]): Option[RegisterResponse] =
var
rr: RegisterResponse
statusOrd: uint
text: string
ttl: uint64
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, statusOrd)
r2 = pb.getField(2, text)
r3 = pb.getField(3, ttl)
if r1.isErr() or r2.isErr() or r3.isErr() or
not checkedEnumAssign(rr.status, statusOrd): return none(RegisterResponse)
if r2.get(): rr.text = some(text)
if r3.get(): rr.ttl = some(ttl)
some(rr)
proc decode(_: typedesc[Unregister], buf: seq[byte]): Option[Unregister] =
var u: Unregister
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, u.ns)
if r1.isErr(): return none(Unregister)
some(u)
proc decode(_: typedesc[Discover], buf: seq[byte]): Option[Discover] =
var
d: Discover
limit: uint64
cookie: seq[byte]
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, d.ns)
r2 = pb.getField(2, limit)
r3 = pb.getField(3, cookie)
if r1.isErr() or r2.isErr() or r3.isErr: return none(Discover)
if r2.get(): d.limit = some(limit)
if r3.get(): d.cookie = some(cookie)
some(d)
proc decode(_: typedesc[DiscoverResponse], buf: seq[byte]): Option[DiscoverResponse] =
var
dr: DiscoverResponse
registrations: seq[seq[byte]]
cookie: seq[byte]
statusOrd: uint
text: string
let
pb = initProtoBuffer(buf)
r1 = pb.getRepeatedField(1, registrations)
r2 = pb.getField(2, cookie)
r3 = pb.getRequiredField(3, statusOrd)
r4 = pb.getField(4, text)
if r1.isErr() or r2.isErr() or r3.isErr or r4.isErr() or
not checkedEnumAssign(dr.status, statusOrd): return none(DiscoverResponse)
for reg in registrations:
var r: Register
let regOpt = Register.decode(reg)
if regOpt.isNone(): return none(DiscoverResponse)
dr.registrations.add(regOpt.get())
if r2.get(): dr.cookie = some(cookie)
if r4.get(): dr.text = some(text)
some(dr)
proc decode(_: typedesc[Message], buf: seq[byte]): Option[Message] =
var
msg: Message
statusOrd: uint
pbr, pbrr, pbu, pbd, pbdr: ProtoBuffer
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, statusOrd)
r2 = pb.getField(2, pbr)
r3 = pb.getField(3, pbrr)
r4 = pb.getField(4, pbu)
r5 = pb.getField(5, pbd)
r6 = pb.getField(6, pbdr)
if r1.isErr() or r2.isErr() or r3.isErr() or
r4.isErr() or r5.isErr() or r6.isErr() or
not checkedEnumAssign(msg.msgType, statusOrd): return none(Message)
if r2.get():
msg.register = Register.decode(pbr.buffer)
if msg.register.isNone(): return none(Message)
if r3.get():
msg.registerResponse = RegisterResponse.decode(pbrr.buffer)
if msg.registerResponse.isNone(): return none(Message)
if r4.get():
msg.unregister = Unregister.decode(pbu.buffer)
if msg.unregister.isNone(): return none(Message)
if r5.get():
msg.discover = Discover.decode(pbd.buffer)
if msg.discover.isNone(): return none(Message)
if r6.get():
msg.discoverResponse = DiscoverResponse.decode(pbdr.buffer)
if msg.discoverResponse.isNone(): return none(Message)
some(msg)
type
RendezVousError* = object of LPError
RegisteredData = object
expiration: Moment
peerId: PeerId
data: Register
RendezVous* = ref object of LPProtocol
# Registered needs to be an offsetted sequence
# because we need stable index for the cookies.
registered: OffsettedSeq[RegisteredData]
# Namespaces is a table whose key is a salted namespace and
# the value is the index sequence corresponding to this
# namespace in the offsettedqueue.
namespaces: Table[string, seq[int]]
rng: ref HmacDrbgContext
salt: string
defaultDT: Moment
registerDeletionLoop: Future[void]
#registerEvent: AsyncEvent # TODO: to raise during the heartbeat
# + make the heartbeat sleep duration "smarter"
sema: AsyncSemaphore
peers: seq[PeerId]
cookiesSaved: Table[PeerId, Table[string, seq[byte]]]
switch: Switch
proc checkPeerRecord(spr: seq[byte], peerId: PeerId): Result[void, string] =
if spr.len == 0: return err("Empty peer record")
let signedEnv = ? SignedPeerRecord.decode(spr).mapErr(x => $x)
if signedEnv.data.peerId != peerId:
return err("Bad Peer ID")
return ok()
proc sendRegisterResponse(conn: Connection,
ttl: uint64) {.async.} =
let msg = encode(Message(
msgType: MessageType.RegisterResponse,
registerResponse: some(RegisterResponse(status: Ok, ttl: some(ttl)))))
await conn.writeLp(msg.buffer)
proc sendRegisterResponseError(conn: Connection,
status: ResponseStatus,
text: string = "") {.async.} =
let msg = encode(Message(
msgType: MessageType.RegisterResponse,
registerResponse: some(RegisterResponse(status: status, text: some(text)))))
await conn.writeLp(msg.buffer)
proc sendDiscoverResponse(conn: Connection,
s: seq[Register],
cookie: Cookie) {.async.} =
let msg = encode(Message(
msgType: MessageType.DiscoverResponse,
discoverResponse: some(DiscoverResponse(
status: Ok,
registrations: s,
cookie: some(cookie.encode().buffer)
))
))
await conn.writeLp(msg.buffer)
proc sendDiscoverResponseError(conn: Connection,
status: ResponseStatus,
text: string = "") {.async.} =
let msg = encode(Message(
msgType: MessageType.DiscoverResponse,
discoverResponse: some(DiscoverResponse(status: status, text: some(text)))))
await conn.writeLp(msg.buffer)
proc countRegister(rdv: RendezVous, peerId: PeerId): int =
let n = Moment.now()
for data in rdv.registered:
if data.peerId == peerId and data.expiration > n:
result.inc()
proc save(rdv: RendezVous,
ns: string,
peerId: PeerId,
r: Register,
update: bool = true) =
let nsSalted = ns & rdv.salt
discard rdv.namespaces.hasKeyOrPut(nsSalted, newSeq[int]())
try:
for index in rdv.namespaces[nsSalted]:
if rdv.registered[index].peerId == peerId:
if update == false: return
rdv.registered[index].expiration = rdv.defaultDT
rdv.registered.add(
RegisteredData(
peerId: peerId,
expiration: Moment.now() + r.ttl.get(MinimumTTL).int64.seconds,
data: r
)
)
rdv.namespaces[nsSalted].add(rdv.registered.high)
# rdv.registerEvent.fire()
except KeyError:
doAssert false, "Should have key"
proc register(rdv: RendezVous, conn: Connection, r: Register): Future[void] =
trace "Received Register", peerId = conn.peerId, ns = r.ns
if r.ns.len notin 1..255:
return conn.sendRegisterResponseError(InvalidNamespace)
let ttl = r.ttl.get(MinimumTTL)
if ttl notin MinimumTTL..MaximumTTL:
return conn.sendRegisterResponseError(InvalidTTL)
let pr = checkPeerRecord(r.signedPeerRecord, conn.peerId)
if pr.isErr():
return conn.sendRegisterResponseError(InvalidSignedPeerRecord, pr.error())
if rdv.countRegister(conn.peerId) >= RegistrationLimitPerPeer:
return conn.sendRegisterResponseError(NotAuthorized, "Registration limit reached")
rdv.save(r.ns, conn.peerId, r)
conn.sendRegisterResponse(ttl)
proc unregister(rdv: RendezVous, conn: Connection, u: Unregister) =
trace "Received Unregister", peerId = conn.peerId, ns = u.ns
let nsSalted = u.ns & rdv.salt
try:
for index in rdv.namespaces[nsSalted]:
if rdv.registered[index].peerId == conn.peerId:
rdv.registered[index].expiration = rdv.defaultDT
except KeyError:
return
proc discover(rdv: RendezVous, conn: Connection, d: Discover) {.async.} =
trace "Received Discover", peerId = conn.peerId, ns = d.ns
if d.ns.len notin 0..255:
await conn.sendDiscoverResponseError(InvalidNamespace)
return
var limit = min(DiscoverLimit, d.limit.get(DiscoverLimit))
var
cookie =
if d.cookie.isSome():
try:
Cookie.decode(d.cookie.get()).get()
except CatchableError:
await conn.sendDiscoverResponseError(InvalidCookie)
return
else: Cookie(offset: rdv.registered.low().uint64 - 1)
if cookie.ns != d.ns or
cookie.offset notin rdv.registered.low().uint64..rdv.registered.high().uint64:
cookie = Cookie(offset: rdv.registered.low().uint64 - 1)
let
nsSalted = d.ns & rdv.salt
namespaces =
if d.ns != "":
try:
rdv.namespaces[nsSalted]
except KeyError:
await conn.sendDiscoverResponseError(InvalidNamespace)
return
else: toSeq(cookie.offset.int..rdv.registered.high())
if namespaces.len() == 0:
await conn.sendDiscoverResponse(@[], Cookie())
return
var offset = namespaces[^1]
let n = Moment.now()
var s = collect(newSeq()):
for index in namespaces:
var reg = rdv.registered[index]
if limit == 0:
offset = index
break
if reg.expiration < n or index.uint64 <= cookie.offset: continue
limit.dec()
reg.data.ttl = some((reg.expiration - Moment.now()).seconds.uint64)
reg.data
rdv.rng.shuffle(s)
await conn.sendDiscoverResponse(s, Cookie(offset: offset.uint64, ns: d.ns))
proc advertisePeer(rdv: RendezVous,
peer: PeerId,
msg: seq[byte]) {.async.} =
proc advertiseWrap() {.async.} =
try:
let conn = await rdv.switch.dial(peer, RendezVousCodec)
defer: await conn.close()
await conn.writeLp(msg)
let
buf = await conn.readLp(4096)
msgRecv = Message.decode(buf).get()
if msgRecv.msgType != MessageType.RegisterResponse:
trace "Unexpected register response", peer, msgType = msgRecv.msgType
elif msgRecv.registerResponse.isNone() or
msgRecv.registerResponse.get().status != ResponseStatus.Ok:
trace "Refuse to register", peer, response = msgRecv.registerResponse
except CatchableError as exc:
trace "exception in the advertise", error = exc.msg
finally:
rdv.sema.release()
await rdv.sema.acquire()
discard await advertiseWrap().withTimeout(5.seconds)
proc advertise*(rdv: RendezVous,
ns: string,
ttl: Duration = MinimumDuration) {.async.} =
let sprBuff = rdv.switch.peerInfo.signedPeerRecord.encode()
if sprBuff.isErr():
raise newException(RendezVousError, "Wrong Signed Peer Record")
if ns.len notin 1..255:
raise newException(RendezVousError, "Invalid namespace")
if ttl notin MinimumDuration..MaximumDuration:
raise newException(RendezVousError, "Invalid time to live")
let
r = Register(ns: ns, signedPeerRecord: sprBuff.get(), ttl: some(ttl.seconds.uint64))
msg = encode(Message(msgType: MessageType.Register, register: some(r)))
rdv.save(ns, rdv.switch.peerInfo.peerId, r)
let fut = collect(newSeq()):
for peer in rdv.peers:
trace "Send Advertise", peerId = peer, ns
rdv.advertisePeer(peer, msg.buffer)
await allFutures(fut)
proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] =
let
nsSalted = ns & rdv.salt
n = Moment.now()
try:
collect(newSeq()):
for index in rdv.namespaces[nsSalted]:
if rdv.registered[index].expiration > n:
SignedPeerRecord.decode(rdv.registered[index].data.signedPeerRecord).get().data
except KeyError as exc:
@[]
proc request*(rdv: RendezVous,
ns: string,
l: int = DiscoverLimit.int): Future[seq[PeerRecord]] {.async.} =
let nsSalted = ns & rdv.salt
var
s: Table[PeerId, (PeerRecord, Register)]
limit: uint64
d = Discover(ns: ns)
if l <= 0 or l > DiscoverLimit.int:
raise newException(RendezVousError, "Invalid limit")
if ns.len notin 0..255:
raise newException(RendezVousError, "Invalid namespace")
limit = l.uint64
proc requestPeer(peer: PeerId) {.async.} =
let conn = await rdv.switch.dial(peer, RendezVousCodec)
defer: await conn.close()
d.limit = some(limit)
d.cookie =
try:
some(rdv.cookiesSaved[peer][ns])
except KeyError as exc:
none(seq[byte])
await conn.writeLp(encode(Message(
msgType: MessageType.Discover,
discover: some(d))).buffer)
let
buf = await conn.readLp(65536)
msgRcv = Message.decode(buf).get()
if msgRcv.msgType != MessageType.DiscoverResponse or
msgRcv.discoverResponse.isNone():
debug "Unexpected discover response", msgType = msgRcv.msgType
return
let resp = msgRcv.discoverResponse.get()
if resp.status != ResponseStatus.Ok:
trace "Cannot discover", ns, status = resp.status, text = resp.text
return
if resp.cookie.isSome() and resp.cookie.get().len < 1000:
if rdv.cookiesSaved.hasKeyOrPut(peer, {ns: resp.cookie.get()}.toTable):
rdv.cookiesSaved[peer][ns] = resp.cookie.get()
for r in resp.registrations:
if limit == 0: return
if r.ttl.isNone() or r.ttl.get() > MaximumTTL: continue
let sprRes = SignedPeerRecord.decode(r.signedPeerRecord)
if sprRes.isErr(): continue
let pr = sprRes.get().data
if s.hasKey(pr.peerId):
let (prSaved, rSaved) = s[pr.peerId]
if (prSaved.seqNo == pr.seqNo and rSaved.ttl.get() < r.ttl.get()) or
prSaved.seqNo < pr.seqNo:
s[pr.peerId] = (pr, r)
else:
s[pr.peerId] = (pr, r)
limit.dec()
for (_, r) in s.values():
rdv.save(ns, peer, r, false)
for peer in rdv.peers:
if limit == 0: break
if RendezVousCodec notin rdv.switch.peerStore[ProtoBook][peer]: continue
try:
trace "Send Request", peerId = peer, ns
await peer.requestPeer()
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception catch in request", error = exc.msg
return toSeq(s.values()).mapIt(it[0])
proc unsubscribeLocally*(rdv: RendezVous, ns: string) =
let nsSalted = ns & rdv.salt
try:
for index in rdv.namespaces[nsSalted]:
if rdv.registered[index].peerId == rdv.switch.peerInfo.peerId:
rdv.registered[index].expiration = rdv.defaultDT
except KeyError:
return
proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} =
# TODO: find a way to improve this, maybe something similar to the advertise
if ns.len notin 1..255:
raise newException(RendezVousError, "Invalid namespace")
rdv.unsubscribeLocally(ns)
let msg = encode(Message(
msgType: MessageType.Unregister,
unregister: some(Unregister(ns: ns))))
proc unsubscribePeer(rdv: RendezVous, peerId: PeerId) {.async.} =
try:
let conn = await rdv.switch.dial(peerId, RendezVousCodec)
defer: await conn.close()
await conn.writeLp(msg.buffer)
except CatchableError as exc:
trace "exception while unsubscribing", error = exc.msg
for peer in rdv.peers:
discard await rdv.unsubscribePeer(peer).withTimeout(5.seconds)
proc setup*(rdv: RendezVous, switch: Switch) =
rdv.switch = switch
proc handlePeer(peerId: PeerId, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined:
rdv.peers.add(peerId)
elif event.kind == PeerEventKind.Left:
rdv.peers.keepItIf(it != peerId)
rdv.switch.addPeerEventHandler(handlePeer, Joined)
rdv.switch.addPeerEventHandler(handlePeer, Left)
proc new*(T: typedesc[RendezVous],
rng: ref HmacDrbgContext = newRng()): T =
let rdv = T(
rng: rng,
salt: string.fromBytes(generateBytes(rng[], 8)),
registered: initOffsettedSeq[RegisteredData](1),
defaultDT: Moment.now() - 1.days,
#registerEvent: newAsyncEvent(),
sema: newAsyncSemaphore(SemaphoreDefaultSize)
)
logScope: topics = "libp2p discovery rendezvous"
proc handleStream(conn: Connection, proto: string) {.async, gcsafe.} =
try:
let
buf = await conn.readLp(4096)
msg = Message.decode(buf).get()
case msg.msgType:
of MessageType.Register: await rdv.register(conn, msg.register.get())
of MessageType.RegisterResponse:
trace "Got an unexpected Register Response", response = msg.registerResponse
of MessageType.Unregister: rdv.unregister(conn, msg.unregister.get())
of MessageType.Discover: await rdv.discover(conn, msg.discover.get())
of MessageType.DiscoverResponse:
trace "Got an unexpected Discover Response", response = msg.discoverResponse
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in rendezvous handler", error = exc.msg
finally:
await conn.close()
rdv.handler = handleStream
rdv.codec = RendezVousCodec
return rdv
proc new*(T: typedesc[RendezVous],
switch: Switch,
rng: ref HmacDrbgContext = newRng()): T =
let rdv = T.new(rng)
rdv.setup(switch)
return rdv
proc deletesRegister(rdv: RendezVous) {.async.} =
heartbeat "Register timeout", 1.minutes:
let n = Moment.now()
rdv.registered.flushIfIt(it.expiration < n)
for data in rdv.namespaces.mvalues():
data.keepItIf(it >= rdv.registered.offset)
method start*(rdv: RendezVous) {.async.} =
if not rdv.registerDeletionLoop.isNil:
warn "Starting rendezvous twice"
return
rdv.registerDeletionLoop = rdv.deletesRegister()
rdv.started = true
method stop*(rdv: RendezVous) {.async.} =
if rdv.registerDeletionLoop.isNil:
warn "Stopping rendezvous without starting it"
return
rdv.started = false
rdv.registerDeletionLoop.cancel()
rdv.registerDeletionLoop = nil

View File

@@ -13,7 +13,6 @@ else:
{.push raises: [].}
import std/[strformat]
import stew/results
import chronos, chronicles
import ../protocol,
../../stream/streamseq,
@@ -22,7 +21,7 @@ import ../protocol,
../../peerinfo,
../../errors
export protocol, results
export protocol
logScope:
topics = "libp2p secure"

View File

@@ -13,13 +13,10 @@ else:
{.push raises: [].}
import std/[oids, strformat]
import stew/results
import chronos, chronicles, metrics
import connection
import ../utility
export results
logScope:
topics = "libp2p chronosstream"
@@ -130,9 +127,6 @@ proc completeWrite(
method write*(s: ChronosStream, msg: seq[byte]): Future[void] =
# Avoid a copy of msg being kept in the closure created by `{.async.}` as this
# drives up memory usage
if msg.len == 0:
trace "Empty byte seq, nothing to write"
return
if s.closed:
let fut = newFuture[void]("chronosstream.write.closed")
fut.fail(newLPStreamClosedError())

View File

@@ -13,14 +13,13 @@ else:
{.push raises: [].}
import std/[hashes, oids, strformat]
import stew/results
import chronicles, chronos, metrics
import lpstream,
../multiaddress,
../peerinfo,
../errors
export lpstream, peerinfo, errors, results
export lpstream, peerinfo, errors
logScope:
topics = "libp2p connection"

View File

@@ -15,7 +15,6 @@ else:
{.push raises: [].}
import std/[oids, sequtils]
import stew/results
import chronos, chronicles
import transport,
../errors,
@@ -32,7 +31,7 @@ import transport,
logScope:
topics = "libp2p tcptransport"
export transport, results
export transport
const
TcpTransportTrackerName* = "libp2p.tcptransport"

View File

@@ -15,7 +15,6 @@ else:
{.push raises: [].}
import std/[sequtils]
import stew/results
import chronos, chronicles
import transport,
../errors,
@@ -32,7 +31,7 @@ import transport,
logScope:
topics = "libp2p wstransport"
export transport, websock, results
export transport, websock
const
WsTransportTrackerName* = "libp2p.wstransport"

View File

@@ -76,9 +76,7 @@ proc identify*(
conn: Connection) {.async, gcsafe.} =
## identify the connection
echo "==>0", self.identity.codec
if (await self.ms.select(conn, self.identity.codec)):
echo "==>1"
let
info = await self.identity.identify(conn, conn.peerId)
peerStore = self.connManager.peerStore

View File

@@ -1,73 +0,0 @@
# Nim-LibP2P
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import sequtils
type
OffsettedSeq*[T] = object
s*: seq[T]
offset*: int
proc initOffsettedSeq*[T](offset: int = 0): OffsettedSeq[T] =
OffsettedSeq[T](s: newSeq[T](), offset: offset)
proc all*[T](o: OffsettedSeq[T], pred: proc (x: T): bool): bool =
o.s.all(pred)
proc any*[T](o: OffsettedSeq[T], pred: proc (x: T): bool): bool =
o.s.any(pred)
proc apply*[T](o: OffsettedSeq[T], op: proc (x: T)) =
o.s.apply(pred)
proc apply*[T](o: OffsettedSeq[T], op: proc (x: T): T) =
o.s.apply(pred)
proc apply*[T](o: OffsettedSeq[T], op: proc (x: var T)) =
o.s.apply(pred)
func count*[T](o: OffsettedSeq[T], x: T): int =
o.s.count(x)
proc flushIf*[T](o: OffsettedSeq[T], pred: proc (x: T): bool) =
var i = 0
for e in o.s:
if not pred(e): break
i.inc()
if i > 0:
o.s.delete(0..<i)
o.offset.inc(i)
template flushIfIt*(o, pred: untyped) =
var i = 0
for it {.inject.} in o.s:
if not pred: break
i.inc()
if i > 0:
when (NimMajor, NimMinor) < (1, 4):
o.s.delete(0, i - 1)
else:
o.s.delete(0..<i)
o.offset.inc(i)
proc add*[T](o: var OffsettedSeq[T], v: T) =
o.s.add(v)
proc `[]`*[T](o: var OffsettedSeq[T], index: int): var T =
o.s[index - o.offset]
iterator items*[T](o: OffsettedSeq[T]): T =
for e in o.s:
yield e
proc high*[T](o: OffsettedSeq[T]): int =
o.s.high + o.offset
proc low*[T](o: OffsettedSeq[T]): int =
o.s.low + o.offset

View File

@@ -3,9 +3,7 @@ site_name: nim-libp2p
repo_url: https://github.com/status-im/nim-libp2p
repo_name: status-im/nim-libp2p
site_url: https://status-im.github.io/nim-libp2p/docs
# Can't find a way to point the edit to the .nim instead
# of the .md
edit_uri: ''
edit_uri: edit/unstable/examples/
docs_dir: examples
@@ -42,8 +40,6 @@ theme:
nav:
- Introduction: README.md
- Tutorials:
- 'Simple connection': tutorial_1_connect.md
- 'Create a custom protocol': tutorial_2_customproto.md
- 'Protobuf': tutorial_3_protobuf.md
- 'Circuit Relay': circuitrelay.md
- 'Part I: Simple connection': tutorial_1_connect.md
- 'Part II: Custom protocol': tutorial_2_customproto.md
- Reference: '/nim-libp2p/master/libp2p.html'

View File

@@ -1,7 +1,8 @@
{.used.}
import std/options
import sequtils
import chronos, stew/[byteutils, results]
import chronos, stew/byteutils
import ../libp2p/[stream/connection,
transports/transport,
upgrademngrs/upgrade,

View File

@@ -1,5 +1,4 @@
import sequtils
import stew/results
import chronos
import ../libp2p/[connmanager,
stream/connection,

View File

@@ -1,51 +0,0 @@
{.used.}
import options, chronos, sets
import stew/byteutils
import ../libp2p/[protocols/rendezvous,
switch,
builders,
discovery/discoverymngr,
discovery/rendezvousinterface,]
import ./helpers
proc createSwitch(rdv: RendezVous = RendezVous.new()): Switch =
SwitchBuilder.new()
.withRng(newRng())
.withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ])
.withTcpTransport()
.withMplex()
.withNoise()
.withRendezVous(rdv)
.build()
suite "Discovery":
teardown:
checkTrackers()
asyncTest "RendezVous test":
let
rdvA = RendezVous.new()
rdvB = RendezVous.new()
clientA = createSwitch(rdvA)
clientB = createSwitch(rdvB)
remoteNode = createSwitch()
dmA = DiscoveryManager()
dmB = DiscoveryManager()
dmA.add(RendezVousInterface.new(rdvA, ttr = 500.milliseconds))
dmB.add(RendezVousInterface.new(rdvB))
await allFutures(clientA.start(), clientB.start(), remoteNode.start())
await clientB.connect(remoteNode.peerInfo.peerId, remoteNode.peerInfo.addrs)
await clientA.connect(remoteNode.peerInfo.peerId, remoteNode.peerInfo.addrs)
dmB.advertise(RdvNamespace("foo"))
let
query = dmA.request(RdvNamespace("foo"))
res = await query.getPeer()
check:
res{PeerId}.get() == clientB.peerInfo.peerId
res[PeerId] == clientB.peerInfo.peerId
res.getAll(PeerId) == @[clientB.peerInfo.peerId]
toHashSet(res.getAll(MultiAddress)) == toHashSet(clientB.peerInfo.addrs)
await allFutures(clientA.stop(), clientB.stop(), remoteNode.stop())

View File

@@ -386,11 +386,6 @@ suite "MultiAddress test suite":
let ma = MultiAddress.init("/ip4/0.0.0.0/tcp/0/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC/p2p-circuit/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSuNEXT/unix/stdio/").get()
check:
$ma[0..0].get() == "/ip4/0.0.0.0"
$ma[^1].get() == "/unix/stdio"
ma[-100].isErr()
ma[100].isErr()
ma[^100].isErr()
ma[^0].isErr()
$ma[0..1].get() == "/ip4/0.0.0.0/tcp/0"
$ma[1..2].get() == "/tcp/0/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC"
$ma[^3..^1].get() == "/p2p-circuit/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSuNEXT/unix/stdio"

View File

@@ -139,18 +139,7 @@ suite "Name resolving":
asyncTest "dnsaddr infinite recursion":
resolver.txtResponses["_dnsaddr.bootstrap.libp2p.io"] = @["dnsaddr=/dnsaddr/bootstrap.libp2p.io"]
check testOne("/dnsaddr/bootstrap.libp2p.io/", newSeq[string]())
test "getHostname":
check:
MultiAddress.init("/dnsaddr/bootstrap.libp2p.io/").tryGet().getHostname == "bootstrap.libp2p.io"
MultiAddress.init("").tryGet().getHostname == ""
MultiAddress.init("/ip4/147.75.69.143/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN").tryGet().getHostname == "147.75.69.143"
MultiAddress.init("/ip6/2604:1380:1000:6000::1/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN").tryGet().getHostname == "2604:1380:1000:6000::1"
MultiAddress.init("/dns/localhost/udp/0").tryGet().getHostname == "localhost"
MultiAddress.init("/dns4/hello.com/udp/0").tryGet().getHostname == "hello.com"
MultiAddress.init("/dns6/hello.com/udp/0").tryGet().getHostname == "hello.com"
MultiAddress.init("/wss/").tryGet().getHostname == ""
check testOne("/dnsaddr/bootstrap.libp2p.io/", "/dnsaddr/bootstrap.libp2p.io/")
suite "DNS Resolving":
teardown:
@@ -182,7 +171,7 @@ suite "Name resolving":
# The test
var dnsresolver = DnsResolver.new(@[server.localAddress])
check await(dnsresolver.resolveIp("status.im", 0.Port, Domain.AF_UNSPEC)) ==
mapIt(
@["104.22.24.181:0", "172.67.10.161:0", "104.22.25.181:0",
@@ -220,7 +209,7 @@ suite "Name resolving":
# The test
var dnsresolver = DnsResolver.new(@[unresponsiveServer.localAddress, server.localAddress])
check await(dnsresolver.resolveIp("status.im", 0.Port, Domain.AF_INET)) ==
mapIt(@["104.22.24.181:0", "172.67.10.161:0", "104.22.25.181:0"], initTAddress(it))

View File

@@ -37,7 +37,5 @@ import testtcptransport,
testmplex,
testrelayv1,
testrelayv2,
testrendezvous,
testdiscovery,
testyamux,
testautonat

View File

@@ -295,7 +295,7 @@ suite "Noise":
(switch2, peerInfo2) = createSwitch(ma2, true, true) # secio, we want to fail
await switch1.start()
await switch2.start()
expect(DialFailedError):
expect(UpgradeFailedError):
let conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, TestCodec)
await allFuturesThrowing(

View File

@@ -1,125 +0,0 @@
{.used.}
import options, sequtils, strutils
import stew/byteutils, chronos
import ../libp2p/[protocols/rendezvous,
switch,
builders,]
import ./helpers
proc createSwitch(rdv: RendezVous = RendezVous.new()): Switch =
SwitchBuilder.new()
.withRng(newRng())
.withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ])
.withTcpTransport()
.withMplex()
.withNoise()
.withRendezVous(rdv)
.build()
suite "RendezVous":
teardown:
checkTrackers()
asyncTest "Simple local test":
let
rdv = RendezVous.new()
s = createSwitch(rdv)
await s.start()
let res0 = rdv.requestLocally("empty")
check res0.len == 0
await rdv.advertise("foo")
let res1 = rdv.requestLocally("foo")
check:
res1.len == 1
res1[0] == s.peerInfo.signedPeerRecord.data
let res2 = rdv.requestLocally("bar")
check res2.len == 0
rdv.unsubscribeLocally("foo")
let res3 = rdv.requestLocally("foo")
check res3.len == 0
await s.stop()
asyncTest "Simple remote test":
let
rdv = RendezVous.new()
client = createSwitch(rdv)
remoteSwitch = createSwitch()
await client.start()
await remoteSwitch.start()
await client.connect(remoteSwitch.peerInfo.peerId, remoteSwitch.peerInfo.addrs)
let res0 = await rdv.request("empty")
check res0.len == 0
await rdv.advertise("foo")
let res1 = await rdv.request("foo")
check:
res1.len == 1
res1[0] == client.peerInfo.signedPeerRecord.data
let res2 = await rdv.request("bar")
check res2.len == 0
await rdv.unsubscribe("foo")
let res3 = await rdv.request("foo")
check res3.len == 0
await allFutures(client.stop(), remoteSwitch.stop())
asyncTest "Harder remote test":
var
rdvSeq: seq[RendezVous] = @[]
clientSeq: seq[Switch] = @[]
remoteSwitch = createSwitch()
for x in 0..10:
rdvSeq.add(RendezVous.new())
clientSeq.add(createSwitch(rdvSeq[^1]))
await remoteSwitch.start()
await allFutures(clientSeq.mapIt(it.start()))
await allFutures(clientSeq.mapIt(remoteSwitch.connect(it.peerInfo.peerId, it.peerInfo.addrs)))
await allFutures(rdvSeq.mapIt(it.advertise("foo")))
var data = clientSeq.mapIt(it.peerInfo.signedPeerRecord.data)
let res1 = await rdvSeq[0].request("foo", 5)
check res1.len == 5
for d in res1:
check d in data
data.keepItIf(it notin res1)
let res2 = await rdvSeq[0].request("foo")
check res2.len == 5
for d in res2:
check d in data
let res3 = await rdvSeq[0].request("foo")
check res3.len == 0
await remoteSwitch.stop()
await allFutures(clientSeq.mapIt(it.stop()))
asyncTest "Simple cookie test":
let
rdvA = RendezVous.new()
rdvB = RendezVous.new()
clientA = createSwitch(rdvA)
clientB = createSwitch(rdvB)
remoteSwitch = createSwitch()
await clientA.start()
await clientB.start()
await remoteSwitch.start()
await clientA.connect(remoteSwitch.peerInfo.peerId, remoteSwitch.peerInfo.addrs)
await clientB.connect(remoteSwitch.peerInfo.peerId, remoteSwitch.peerInfo.addrs)
await rdvA.advertise("foo")
let res1 = await rdvA.request("foo")
await rdvB.advertise("foo")
let res2 = await rdvA.request("foo")
check:
res2.len == 1
res2[0] == clientB.peerInfo.signedPeerRecord.data
await allFutures(clientA.stop(), clientB.stop(), remoteSwitch.stop())
asyncTest "Various local error":
let
rdv = RendezVous.new()
switch = createSwitch(rdv)
expect RendezVousError: discard await rdv.request("A".repeat(300))
expect RendezVousError: discard await rdv.request("A", -1)
expect RendezVousError: discard await rdv.request("A", 3000)
expect RendezVousError: await rdv.advertise("A".repeat(300))
expect RendezVousError: await rdv.advertise("A", 2.weeks)
expect RendezVousError: await rdv.advertise("A", 5.minutes)

View File

@@ -201,25 +201,13 @@ suite "Switch":
check not switch1.isConnected(switch2.peerInfo.peerId)
check not switch2.isConnected(switch1.peerInfo.peerId)
asyncTest "e2e connect to peer with unknown PeerId":
let resolver = MockResolver.new()
asyncTest "e2e connect to peer with unkown PeerId":
let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise], nameResolver = resolver)
let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
await switch1.start()
await switch2.start()
# via dnsaddr
resolver.txtResponses["_dnsaddr.test.io"] = @[
"dnsaddr=" & $switch1.peerInfo.addrs[0] & "/p2p/" & $switch1.peerInfo.peerId,
]
check: (await switch2.connect(@[MultiAddress.init("/dnsaddr/test.io/").tryGet()])) == switch1.peerInfo.peerId
await switch2.disconnect(switch1.peerInfo.peerId)
# via direct ip
check not switch2.isConnected(switch1.peerInfo.peerId)
check: (await switch2.connect(switch1.peerInfo.addrs)) == switch1.peerInfo.peerId
await switch2.disconnect(switch1.peerInfo.peerId)
await allFuturesThrowing(
@@ -677,7 +665,7 @@ suite "Switch":
await switch.start()
var peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).get()
expect DialFailedError:
expect LPStreamClosedError, LPStreamEOFError:
await switch.connect(peerId, transport.addrs)
await handlerWait
@@ -1006,10 +994,9 @@ suite "Switch":
await srcWsSwitch.start()
resolver.txtResponses["_dnsaddr.test.io"] = @[
"dnsaddr=/dns4/localhost" & $destSwitch.peerInfo.addrs[0][1..^1].tryGet() & "/p2p/" & $destSwitch.peerInfo.peerId,
"dnsaddr=/dns4/localhost" & $destSwitch.peerInfo.addrs[1][1..^1].tryGet()
"dnsaddr=" & $destSwitch.peerInfo.addrs[0],
"dnsaddr=" & $destSwitch.peerInfo.addrs[1]
]
resolver.ipResponses[("localhost", false)] = @["127.0.0.1"]
let testAddr = MultiAddress.init("/dnsaddr/test.io/").tryGet()

View File

@@ -1,29 +0,0 @@
import os, strutils
let contents =
if paramCount() > 0:
readFile(paramStr(1))
else:
stdin.readAll()
var code = ""
for line in contents.splitLines(true):
let
stripped = line.strip()
isMarkdown = stripped.startsWith("##")
if isMarkdown:
if code.strip.len > 0:
echo "```nim"
echo code.strip(leading = false)
echo "```"
code = ""
echo(if stripped.len > 3: stripped[3..^1]
else: "")
else:
code &= line
if code.strip.len > 0:
echo ""
echo "```nim"
echo code
echo "```"