mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-07 01:34:03 -05:00
chore: remove go daemon (#1705)
This commit is contained in:
9
.github/workflows/ci.yml
vendored
9
.github/workflows/ci.yml
vendored
@@ -72,15 +72,6 @@ jobs:
|
||||
shell: ${{ matrix.shell }}
|
||||
nim_ref: ${{ matrix.nim.ref }}
|
||||
|
||||
- name: Setup Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: '~1.16.0' # That's the minimum Go version that works with arm.
|
||||
|
||||
- name: Install p2pd
|
||||
run: |
|
||||
V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3
|
||||
|
||||
- name: Restore deps from cache
|
||||
id: deps-cache
|
||||
uses: actions/cache@v3
|
||||
|
||||
10
.github/workflows/daily_common.yml
vendored
10
.github/workflows/daily_common.yml
vendored
@@ -69,16 +69,6 @@ jobs:
|
||||
nim_ref: ${{ matrix.nim.ref }}
|
||||
cpu: ${{ matrix.cpu }}
|
||||
|
||||
- name: Setup Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: '~1.16.0'
|
||||
cache: false
|
||||
|
||||
- name: Install p2pd
|
||||
run: |
|
||||
V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3
|
||||
|
||||
- name: Install dependencies (pinned)
|
||||
if: ${{ inputs.pinned_deps }}
|
||||
run: |
|
||||
|
||||
10
README.md
10
README.md
@@ -47,7 +47,7 @@ nimble install libp2p
|
||||
You'll find the nim-libp2p documentation [here](https://vacp2p.github.io/nim-libp2p/docs/). See [examples](./examples) for simple usage patterns.
|
||||
|
||||
## Getting Started
|
||||
Try out the chat example. For this you'll need to have [`go-libp2p-daemon`](examples/go-daemon/daemonapi.md) running. Full code can be found [here](https://github.com/status-im/nim-libp2p/blob/master/examples/chat.nim):
|
||||
Try out the chat example. Full code can be found [here](https://github.com/status-im/nim-libp2p/blob/master/examples/chat.nim):
|
||||
|
||||
```bash
|
||||
nim c -r --threads:on examples/directchat.nim
|
||||
@@ -81,12 +81,6 @@ Run unit tests:
|
||||
# run all the unit tests
|
||||
nimble test
|
||||
```
|
||||
**Obs:** Running all tests requires the [`go-libp2p-daemon` to be installed and running](examples/go-daemon/daemonapi.md).
|
||||
|
||||
If you only want to run tests that don't require `go-libp2p-daemon`, use:
|
||||
```
|
||||
nimble testnative
|
||||
```
|
||||
|
||||
For a list of all available test suites, use:
|
||||
```
|
||||
@@ -155,8 +149,6 @@ List of packages modules implemented in nim-libp2p:
|
||||
| [connmanager](libp2p/connmanager.nim) | Connection manager |
|
||||
| [identify / push identify](libp2p/protocols/identify.nim) | [Identify](https://docs.libp2p.io/concepts/fundamentals/protocols/#identify) protocol |
|
||||
| [ping](libp2p/protocols/ping.nim) | [Ping](https://docs.libp2p.io/concepts/fundamentals/protocols/#ping) protocol |
|
||||
| [libp2p-daemon-client](libp2p/daemon/daemonapi.nim) | [go-daemon](https://github.com/libp2p/go-libp2p-daemon) nim wrapper |
|
||||
| [interop-libp2p](tests/testinterop.nim) | Interop tests |
|
||||
| **Transports** | |
|
||||
| [libp2p-tcp](libp2p/transports/tcptransport.nim) | TCP transport |
|
||||
| [libp2p-ws](libp2p/transports/wstransport.nim) | WebSocket & WebSocket Secure transport |
|
||||
|
||||
@@ -1,54 +0,0 @@
|
||||
import chronos, nimcrypto, strutils
|
||||
import ../../libp2p/daemon/daemonapi
|
||||
import ../hexdump
|
||||
|
||||
const PubSubTopic = "test-net"
|
||||
|
||||
proc dumpSubscribedPeers(api: DaemonAPI) {.async.} =
|
||||
var peers = await api.pubsubListPeers(PubSubTopic)
|
||||
echo "= List of connected and subscribed peers:"
|
||||
for item in peers:
|
||||
echo item.pretty()
|
||||
|
||||
proc dumpAllPeers(api: DaemonAPI) {.async.} =
|
||||
var peers = await api.listPeers()
|
||||
echo "Current connected peers count = ", len(peers)
|
||||
for item in peers:
|
||||
echo item.peer.pretty()
|
||||
|
||||
proc monitor(api: DaemonAPI) {.async.} =
|
||||
while true:
|
||||
echo "Dumping all peers"
|
||||
await dumpAllPeers(api)
|
||||
await sleepAsync(5000)
|
||||
|
||||
proc main() {.async.} =
|
||||
echo "= Starting P2P bootnode"
|
||||
var api = await newDaemonApi({DHTFull, PSGossipSub})
|
||||
var id = await api.identity()
|
||||
echo "= P2P bootnode ", id.peer.pretty(), " started."
|
||||
let mcip4 = multiCodec("ip4")
|
||||
let mcip6 = multiCodec("ip6")
|
||||
echo "= You can use one of this addresses to bootstrap your nodes:"
|
||||
for item in id.addresses:
|
||||
if item.protoCode() == mcip4 or item.protoCode() == mcip6:
|
||||
echo $item & "/ipfs/" & id.peer.pretty()
|
||||
|
||||
asyncSpawn monitor(api)
|
||||
|
||||
proc pubsubLogger(
|
||||
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
|
||||
): Future[bool] {.async.} =
|
||||
let msglen = len(message.data)
|
||||
echo "= Recieved pubsub message with length ",
|
||||
msglen, " bytes from peer ", message.peer.pretty()
|
||||
echo dumpHex(message.data)
|
||||
await api.dumpSubscribedPeers()
|
||||
result = true
|
||||
|
||||
var ticket = await api.pubsubSubscribe(PubSubTopic, pubsubLogger)
|
||||
|
||||
when isMainModule:
|
||||
waitFor(main())
|
||||
while true:
|
||||
poll()
|
||||
@@ -1,132 +0,0 @@
|
||||
import chronos, nimcrypto, strutils
|
||||
import ../../libp2p/daemon/daemonapi
|
||||
|
||||
## nim c -r --threads:on chat.nim
|
||||
when not (compileOption("threads")):
|
||||
{.fatal: "Please, compile this program with the --threads:on option!".}
|
||||
|
||||
const ServerProtocols = @["/test-chat-stream"]
|
||||
|
||||
type CustomData = ref object
|
||||
api: DaemonAPI
|
||||
remotes: seq[StreamTransport]
|
||||
consoleFd: AsyncFD
|
||||
serveFut: Future[void]
|
||||
|
||||
proc threadMain(wfd: AsyncFD) {.thread.} =
|
||||
## This procedure performs reading from `stdin` and sends data over
|
||||
## pipe to main thread.
|
||||
var transp = fromPipe(wfd)
|
||||
|
||||
while true:
|
||||
var line = stdin.readLine()
|
||||
let res = waitFor transp.write(line & "\r\n")
|
||||
|
||||
proc serveThread(udata: CustomData) {.async.} =
|
||||
## This procedure perform reading on pipe and sends data to remote clients.
|
||||
var transp = fromPipe(udata.consoleFd)
|
||||
|
||||
proc remoteReader(transp: StreamTransport) {.async.} =
|
||||
while true:
|
||||
var line = await transp.readLine()
|
||||
if len(line) == 0:
|
||||
break
|
||||
echo ">> ", line
|
||||
|
||||
while true:
|
||||
try:
|
||||
var line = await transp.readLine()
|
||||
if line.startsWith("/connect"):
|
||||
var parts = line.split(" ")
|
||||
if len(parts) == 2:
|
||||
var peerId = PeerId.init(parts[1])
|
||||
var address = MultiAddress.init(multiCodec("p2p-circuit"))
|
||||
address &= MultiAddress.init(multiCodec("p2p"), peerId)
|
||||
echo "= Searching for peer ", peerId.pretty()
|
||||
var id = await udata.api.dhtFindPeer(peerId)
|
||||
echo "= Peer " & parts[1] & " found at addresses:"
|
||||
for item in id.addresses:
|
||||
echo $item
|
||||
echo "= Connecting to peer ", $address
|
||||
await udata.api.connect(peerId, @[address], 30)
|
||||
echo "= Opening stream to peer chat ", parts[1]
|
||||
var stream = await udata.api.openStream(peerId, ServerProtocols)
|
||||
udata.remotes.add(stream.transp)
|
||||
echo "= Connected to peer chat ", parts[1]
|
||||
asyncSpawn remoteReader(stream.transp)
|
||||
elif line.startsWith("/search"):
|
||||
var parts = line.split(" ")
|
||||
if len(parts) == 2:
|
||||
var peerId = PeerId.init(parts[1])
|
||||
echo "= Searching for peer ", peerId.pretty()
|
||||
var id = await udata.api.dhtFindPeer(peerId)
|
||||
echo "= Peer " & parts[1] & " found at addresses:"
|
||||
for item in id.addresses:
|
||||
echo $item
|
||||
elif line.startsWith("/consearch"):
|
||||
var parts = line.split(" ")
|
||||
if len(parts) == 2:
|
||||
var peerId = PeerId.init(parts[1])
|
||||
echo "= Searching for peers connected to peer ", parts[1]
|
||||
var peers = await udata.api.dhtFindPeersConnectedToPeer(peerId)
|
||||
echo "= Found ", len(peers), " connected to peer ", parts[1]
|
||||
for item in peers:
|
||||
var peer = item.peer
|
||||
var addresses = newSeq[string]()
|
||||
var relay = false
|
||||
for a in item.addresses:
|
||||
addresses.add($a)
|
||||
if a.protoName() == "/p2p-circuit":
|
||||
relay = true
|
||||
break
|
||||
if relay:
|
||||
echo peer.pretty(), " * ", " [", addresses.join(", "), "]"
|
||||
else:
|
||||
echo peer.pretty(), " [", addresses.join(", "), "]"
|
||||
elif line.startsWith("/exit"):
|
||||
break
|
||||
else:
|
||||
var msg = line & "\r\n"
|
||||
echo "<< ", line
|
||||
var pending = newSeq[Future[int]]()
|
||||
for item in udata.remotes:
|
||||
pending.add(item.write(msg))
|
||||
if len(pending) > 0:
|
||||
var results = await all(pending)
|
||||
except CatchableError as err:
|
||||
echo err.msg
|
||||
|
||||
proc main() {.async.} =
|
||||
var data = new CustomData
|
||||
data.remotes = newSeq[StreamTransport]()
|
||||
|
||||
var (rfd, wfd) = createAsyncPipe()
|
||||
if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe:
|
||||
raise newException(ValueError, "Could not initialize pipe!")
|
||||
|
||||
data.consoleFd = rfd
|
||||
|
||||
data.serveFut = serveThread(data)
|
||||
var thread: Thread[AsyncFD]
|
||||
thread.createThread(threadMain, wfd)
|
||||
|
||||
echo "= Starting P2P node"
|
||||
data.api = await newDaemonApi({DHTFull, Bootstrap})
|
||||
await sleepAsync(3000)
|
||||
var id = await data.api.identity()
|
||||
|
||||
proc streamHandler(api: DaemonAPI, stream: P2PStream) {.async.} =
|
||||
echo "= Peer ", stream.peer.pretty(), " joined chat"
|
||||
data.remotes.add(stream.transp)
|
||||
while true:
|
||||
var line = await stream.transp.readLine()
|
||||
if len(line) == 0:
|
||||
break
|
||||
echo ">> ", line
|
||||
|
||||
await data.api.addHandler(ServerProtocols, streamHandler)
|
||||
echo "= Your PeerId is ", id.peer.pretty()
|
||||
await data.serveFut
|
||||
|
||||
when isMainModule:
|
||||
waitFor(main())
|
||||
@@ -1,43 +0,0 @@
|
||||
# Table of Contents
|
||||
- [Introduction](#introduction)
|
||||
- [Prerequisites](#prerequisites)
|
||||
- [Installation](#installation)
|
||||
- [Script](#script)
|
||||
- [Examples](#examples)
|
||||
|
||||
# Introduction
|
||||
This is a libp2p-backed daemon wrapping the functionalities of go-libp2p for use in Nim. <br>
|
||||
For more information about the go daemon, check out [this repository](https://github.com/libp2p/go-libp2p-daemon).
|
||||
> **Required only** for running the tests.
|
||||
|
||||
# Prerequisites
|
||||
Go with version `1.16.0`
|
||||
> You will *likely* be able to build `go-libp2p-daemon` with different Go versions, but **they haven't been tested**.
|
||||
|
||||
# Installation
|
||||
Run the build script while having the `go` command pointing to the correct Go version.
|
||||
```sh
|
||||
./scripts/build_p2pd.sh
|
||||
```
|
||||
`build_p2pd.sh` will not rebuild unless needed. If you already have the newest binary and you want to force the rebuild, use:
|
||||
```sh
|
||||
./scripts/build_p2pd.sh -f
|
||||
```
|
||||
Or:
|
||||
```sh
|
||||
./scripts/build_p2pd.sh --force
|
||||
```
|
||||
|
||||
If everything goes correctly, the binary (`p2pd`) should be built and placed in the `$GOPATH/bin` directory.
|
||||
If you're having issues, head into [our discord](https://discord.com/channels/864066763682218004/1115526869769535629) and ask for assistance.
|
||||
|
||||
After successfully building the binary, remember to add it to your path so it can be found. You can do that by running:
|
||||
```sh
|
||||
export PATH="$PATH:$HOME/go/bin"
|
||||
```
|
||||
> **Tip:** To make this change permanent, add the command above to your `.bashrc` file.
|
||||
|
||||
# Examples
|
||||
Examples can be found in the [examples folder](https://github.com/status-im/nim-libp2p/tree/readme/examples/go-daemon)
|
||||
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
import chronos, nimcrypto, strutils, os
|
||||
import ../../libp2p/daemon/daemonapi
|
||||
|
||||
const PubSubTopic = "test-net"
|
||||
|
||||
proc main(bn: string) {.async.} =
|
||||
echo "= Starting P2P node"
|
||||
var bootnodes = bn.split(",")
|
||||
var api = await newDaemonApi(
|
||||
{DHTFull, PSGossipSub, WaitBootstrap}, bootstrapNodes = bootnodes, peersRequired = 1
|
||||
)
|
||||
var id = await api.identity()
|
||||
echo "= P2P node ", id.peer.pretty(), " started:"
|
||||
for item in id.addresses:
|
||||
echo item
|
||||
|
||||
proc pubsubLogger(
|
||||
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
|
||||
): Future[bool] {.async.} =
|
||||
let msglen = len(message.data)
|
||||
echo "= Recieved pubsub message with length ",
|
||||
msglen, " bytes from peer ", message.peer.pretty(), ": "
|
||||
var strdata = cast[string](message.data)
|
||||
echo strdata
|
||||
result = true
|
||||
|
||||
var ticket = await api.pubsubSubscribe(PubSubTopic, pubsubLogger)
|
||||
|
||||
# Waiting for gossipsub interval
|
||||
while true:
|
||||
var peers = await api.pubsubListPeers(PubSubTopic)
|
||||
if len(peers) > 0:
|
||||
break
|
||||
await sleepAsync(1000)
|
||||
|
||||
var data = "HELLO\r\n"
|
||||
var msgData = cast[seq[byte]](data)
|
||||
await api.pubsubPublish(PubSubTopic, msgData)
|
||||
|
||||
when isMainModule:
|
||||
if paramCount() != 1:
|
||||
echo "Please supply bootnodes!"
|
||||
else:
|
||||
waitFor(main(paramStr(1)))
|
||||
while true:
|
||||
poll()
|
||||
@@ -49,12 +49,6 @@ proc tutorialToMd(filename: string) =
|
||||
task testnative, "Runs libp2p native tests":
|
||||
runTest("testnative")
|
||||
|
||||
task testdaemon, "Runs daemon tests":
|
||||
runTest("testdaemon")
|
||||
|
||||
task testinterop, "Runs interop tests":
|
||||
runTest("testinterop")
|
||||
|
||||
task testpubsub, "Runs pubsub tests":
|
||||
runTest("pubsub/testpubsub", "-d:libp2p_gossipsub_1_4")
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,156 +0,0 @@
|
||||
# Nim-Libp2p
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
## This module implements Pool of StreamTransport.
|
||||
import chronos
|
||||
|
||||
const DefaultPoolSize* = 8 ## Default pool size
|
||||
|
||||
type
|
||||
ConnectionFlags = enum
|
||||
None
|
||||
Busy
|
||||
|
||||
PoolItem = object
|
||||
transp*: StreamTransport
|
||||
flags*: set[ConnectionFlags]
|
||||
|
||||
PoolState = enum
|
||||
Connecting
|
||||
Connected
|
||||
Closing
|
||||
Closed
|
||||
|
||||
TransportPool* = ref object ## Transports pool object
|
||||
transports: seq[PoolItem]
|
||||
busyCount: int
|
||||
state: PoolState
|
||||
bufferSize: int
|
||||
event: AsyncEvent
|
||||
|
||||
TransportPoolError* = object of AsyncError
|
||||
|
||||
proc waitAll[T](futs: seq[Future[T]]): Future[void] =
|
||||
## Performs waiting for all Future[T].
|
||||
var counter = len(futs)
|
||||
var retFuture = newFuture[void]("connpool.waitAllConnections")
|
||||
proc cb(udata: pointer) =
|
||||
dec(counter)
|
||||
if counter == 0:
|
||||
retFuture.complete()
|
||||
|
||||
for fut in futs:
|
||||
fut.addCallback(cb)
|
||||
return retFuture
|
||||
|
||||
proc newPool*(
|
||||
address: TransportAddress,
|
||||
poolsize: int = DefaultPoolSize,
|
||||
bufferSize = DefaultStreamBufferSize,
|
||||
): Future[TransportPool] {.async: (raises: [CancelledError]).} =
|
||||
## Establish pool of connections to address ``address`` with size
|
||||
## ``poolsize``.
|
||||
var pool = new TransportPool
|
||||
pool.bufferSize = bufferSize
|
||||
pool.transports = newSeq[PoolItem](poolsize)
|
||||
var conns = newSeq[Future[StreamTransport]](poolsize)
|
||||
pool.state = Connecting
|
||||
for i in 0 ..< poolsize:
|
||||
conns[i] = connect(address, bufferSize)
|
||||
# Waiting for all connections to be established.
|
||||
await waitAll(conns)
|
||||
# Checking connections and preparing pool.
|
||||
for i in 0 ..< poolsize:
|
||||
if conns[i].failed:
|
||||
raise conns[i].error
|
||||
else:
|
||||
let transp = conns[i].read()
|
||||
let item = PoolItem(transp: transp)
|
||||
pool.transports[i] = item
|
||||
# Setup available connections event
|
||||
pool.event = newAsyncEvent()
|
||||
pool.state = Connected
|
||||
result = pool
|
||||
|
||||
proc acquire*(
|
||||
pool: TransportPool
|
||||
): Future[StreamTransport] {.async: (raises: [CancelledError, TransportPoolError]).} =
|
||||
## Acquire non-busy connection from pool ``pool``.
|
||||
var transp: StreamTransport
|
||||
if pool.state in {Connected}:
|
||||
while true:
|
||||
if pool.busyCount < len(pool.transports):
|
||||
for conn in pool.transports.mitems():
|
||||
if Busy notin conn.flags:
|
||||
conn.flags.incl(Busy)
|
||||
inc(pool.busyCount)
|
||||
transp = conn.transp
|
||||
break
|
||||
else:
|
||||
await pool.event.wait()
|
||||
pool.event.clear()
|
||||
|
||||
if not isNil(transp):
|
||||
break
|
||||
else:
|
||||
raise newException(TransportPoolError, "Pool is not ready!")
|
||||
result = transp
|
||||
|
||||
proc release*(
|
||||
pool: TransportPool, transp: StreamTransport
|
||||
) {.async: (raises: [TransportPoolError]).} =
|
||||
## Release connection ``transp`` back to pool ``pool``.
|
||||
if pool.state in {Connected, Closing}:
|
||||
var found = false
|
||||
for conn in pool.transports.mitems():
|
||||
if conn.transp == transp:
|
||||
conn.flags.excl(Busy)
|
||||
dec(pool.busyCount)
|
||||
pool.event.fire()
|
||||
found = true
|
||||
break
|
||||
if not found:
|
||||
raise newException(TransportPoolError, "Transport not bound to pool!")
|
||||
else:
|
||||
raise newException(TransportPoolError, "Pool is not ready!")
|
||||
|
||||
proc join*(
|
||||
pool: TransportPool
|
||||
) {.async: (raises: [TransportPoolError, CancelledError]).} =
|
||||
## Waiting for all connection to become available.
|
||||
if pool.state in {Connected, Closing}:
|
||||
while true:
|
||||
if pool.busyCount == 0:
|
||||
break
|
||||
else:
|
||||
await pool.event.wait()
|
||||
pool.event.clear()
|
||||
elif pool.state == Connecting:
|
||||
raise newException(TransportPoolError, "Pool is not ready!")
|
||||
|
||||
proc close*(
|
||||
pool: TransportPool
|
||||
) {.async: (raises: [TransportPoolError, CancelledError]).} =
|
||||
## Closes transports pool ``pool`` and release all resources.
|
||||
if pool.state == Connected:
|
||||
pool.state = Closing
|
||||
# Waiting for all transports to become available.
|
||||
await pool.join()
|
||||
# Closing all transports
|
||||
var pending = newSeq[Future[void]](len(pool.transports))
|
||||
for i in 0 ..< len(pool.transports):
|
||||
let transp = pool.transports[i].transp
|
||||
transp.close()
|
||||
pending[i] = transp.join()
|
||||
# Waiting for all transports to be closed
|
||||
await waitAll(pending)
|
||||
# Mark pool as `Closed`.
|
||||
pool.state = Closed
|
||||
@@ -150,57 +150,6 @@ proc createStreamServer*[T](
|
||||
except CatchableError as exc:
|
||||
raise newException(LPError, "failed simpler createStreamServer: " & exc.msg, exc)
|
||||
|
||||
proc createAsyncSocket*(ma: MultiAddress): AsyncFD {.raises: [ValueError, LPError].} =
|
||||
## Create new asynchronous socket using MultiAddress' ``ma`` socket type and
|
||||
## protocol information.
|
||||
##
|
||||
## Returns ``asyncInvalidSocket`` on error.
|
||||
##
|
||||
## Note: This procedure only used in `go-libp2p-daemon` wrapper.
|
||||
##
|
||||
|
||||
var
|
||||
socktype: SockType = SockType.SOCK_STREAM
|
||||
protocol: Protocol = Protocol.IPPROTO_TCP
|
||||
|
||||
let address = initTAddress(ma).tryGet()
|
||||
if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
|
||||
if ma[1].tryGet().protoCode().tryGet() == multiCodec("udp"):
|
||||
socktype = SockType.SOCK_DGRAM
|
||||
protocol = Protocol.IPPROTO_UDP
|
||||
elif ma[1].tryGet().protoCode().tryGet() == multiCodec("tcp"):
|
||||
socktype = SockType.SOCK_STREAM
|
||||
protocol = Protocol.IPPROTO_TCP
|
||||
elif address.family in {AddressFamily.Unix}:
|
||||
socktype = SockType.SOCK_STREAM
|
||||
protocol = cast[Protocol](0)
|
||||
else:
|
||||
return asyncInvalidSocket
|
||||
|
||||
try:
|
||||
createAsyncSocket(address.getDomain(), socktype, protocol)
|
||||
except CatchableError as exc:
|
||||
raise newException(
|
||||
LPError, "Convert exception to LPError in createAsyncSocket: " & exc.msg, exc
|
||||
)
|
||||
|
||||
proc bindAsyncSocket*(sock: AsyncFD, ma: MultiAddress): bool {.raises: [LPError].} =
|
||||
## Bind socket ``sock`` to MultiAddress ``ma``.
|
||||
##
|
||||
## Note: This procedure only used in `go-libp2p-daemon` wrapper.
|
||||
##
|
||||
|
||||
var
|
||||
saddr: Sockaddr_storage
|
||||
slen: SockLen
|
||||
|
||||
let address = initTAddress(ma).tryGet()
|
||||
toSAddr(address, saddr, slen)
|
||||
if bindSocket(SocketHandle(sock), cast[ptr SockAddr](addr saddr), slen) == 0:
|
||||
result = true
|
||||
else:
|
||||
result = false
|
||||
|
||||
proc getLocalAddress*(sock: AsyncFD): TransportAddress =
|
||||
## Retrieve local socket ``sock`` address.
|
||||
##
|
||||
|
||||
@@ -1,123 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Copyright (c) 2018-2020 Status Research & Development GmbH. Licensed under
|
||||
# either of:
|
||||
# - Apache License, version 2.0
|
||||
# - MIT license
|
||||
# at your option. This file may not be copied, modified, or distributed except
|
||||
# according to those terms.
|
||||
|
||||
set -e
|
||||
|
||||
force=false
|
||||
verbose=false
|
||||
CACHE_DIR=""
|
||||
LIBP2P_COMMIT="124530a3"
|
||||
|
||||
while [[ "$#" -gt 0 ]]; do
|
||||
case "$1" in
|
||||
-f|--force) force=true ;;
|
||||
-v|--verbose) verbose=true ;;
|
||||
-h|--help)
|
||||
echo "Usage: $0 [-f|--force] [-v|--verbose] [CACHE_DIR] [COMMIT]"
|
||||
exit 0
|
||||
;;
|
||||
*)
|
||||
# First non-option is CACHE_DIR, second is LIBP2P_COMMIT
|
||||
if [[ -z "$CACHE_DIR" ]]; then
|
||||
CACHE_DIR="$1"
|
||||
elif [[ "$LIBP2P_COMMIT" == "124530a3" ]]; then
|
||||
LIBP2P_COMMIT="$1"
|
||||
else
|
||||
echo "Unknown argument: $1"
|
||||
exit 1
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
shift
|
||||
done
|
||||
|
||||
SUBREPO_DIR="vendor/go/src/github.com/libp2p/go-libp2p-daemon"
|
||||
if [[ ! -e "$SUBREPO_DIR" ]]; then
|
||||
SUBREPO_DIR="go-libp2p-daemon"
|
||||
rm -rf "$SUBREPO_DIR"
|
||||
git clone -q https://github.com/libp2p/go-libp2p-daemon
|
||||
cd "$SUBREPO_DIR"
|
||||
git checkout -q "$LIBP2P_COMMIT"
|
||||
cd ..
|
||||
fi
|
||||
|
||||
## env vars
|
||||
[[ -z "$BUILD_MSG" ]] && BUILD_MSG="Building p2pd ${LIBP2P_COMMIT}"
|
||||
|
||||
# Windows detection
|
||||
if uname | grep -qiE "mingw|msys"; then
|
||||
EXE_SUFFIX=".exe"
|
||||
# otherwise it fails in AppVeyor due to https://github.com/git-for-windows/git/issues/2495
|
||||
GIT_TIMESTAMP_ARG="--date=unix" # available since Git 2.9.4
|
||||
else
|
||||
EXE_SUFFIX=""
|
||||
GIT_TIMESTAMP_ARG="--date=format-local:%s" # available since Git 2.7.0
|
||||
fi
|
||||
|
||||
TARGET_DIR="$(go env GOPATH)/bin"
|
||||
TARGET_BINARY="${TARGET_DIR}/p2pd${EXE_SUFFIX}"
|
||||
|
||||
target_needs_rebuilding() {
|
||||
REBUILD=0
|
||||
NO_REBUILD=1
|
||||
|
||||
if [[ -n "$CACHE_DIR" && -e "${CACHE_DIR}/p2pd${EXE_SUFFIX}" ]]; then
|
||||
mkdir -p "${TARGET_DIR}"
|
||||
cp -a "$CACHE_DIR"/* "${TARGET_DIR}/"
|
||||
fi
|
||||
|
||||
# compare the built commit's timestamp to the date of the last commit (keep in mind that Git doesn't preserve file timestamps)
|
||||
if [[ -e "${TARGET_DIR}/timestamp" && $(cat "${TARGET_DIR}/timestamp") -eq $(cd "$SUBREPO_DIR"; git log --pretty=format:%cd -n 1 ${GIT_TIMESTAMP_ARG}) ]]; then
|
||||
return $NO_REBUILD
|
||||
else
|
||||
return $REBUILD
|
||||
fi
|
||||
}
|
||||
|
||||
build_target() {
|
||||
echo -e "$BUILD_MSG"
|
||||
|
||||
pushd "$SUBREPO_DIR"
|
||||
# Go module downloads can fail randomly in CI VMs, so retry them a few times
|
||||
MAX_RETRIES=5
|
||||
CURR=0
|
||||
while [[ $CURR -lt $MAX_RETRIES ]]; do
|
||||
FAILED=0
|
||||
go get ./... && break || FAILED=1
|
||||
CURR=$(( CURR + 1 ))
|
||||
if $verbose; then
|
||||
echo "retry #${CURR}"
|
||||
fi
|
||||
done
|
||||
if [[ $FAILED == 1 ]]; then
|
||||
echo "Error: still fails after retrying ${MAX_RETRIES} times."
|
||||
exit 1
|
||||
fi
|
||||
go install ./...
|
||||
|
||||
# record the last commit's timestamp
|
||||
git log --pretty=format:%cd -n 1 ${GIT_TIMESTAMP_ARG} > "${TARGET_DIR}/timestamp"
|
||||
|
||||
popd
|
||||
|
||||
# update the CI cache
|
||||
if [[ -n "$CACHE_DIR" ]]; then
|
||||
rm -rf "$CACHE_DIR"
|
||||
mkdir "$CACHE_DIR"
|
||||
cp -a "$TARGET_DIR"/* "$CACHE_DIR"/
|
||||
fi
|
||||
echo "Binary built successfully: $TARGET_BINARY"
|
||||
}
|
||||
|
||||
if $force || target_needs_rebuilding; then
|
||||
build_target
|
||||
else
|
||||
echo "No rebuild needed."
|
||||
fi
|
||||
|
||||
@@ -1,665 +0,0 @@
|
||||
import chronos, chronicles, stew/byteutils
|
||||
import helpers
|
||||
import ../libp2p
|
||||
import ../libp2p/[daemon/daemonapi, varint, transports/wstransport, crypto/crypto]
|
||||
import ../libp2p/protocols/connectivity/relay/[relay, client, utils]
|
||||
|
||||
type
|
||||
SwitchCreator = proc(
|
||||
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
|
||||
prov = proc(config: TransportConfig): Transport =
|
||||
TcpTransport.new({}, config.upgr),
|
||||
relay: Relay = Relay.new(circuitRelayV1 = true),
|
||||
): Switch {.gcsafe, raises: [LPError].}
|
||||
DaemonPeerInfo = daemonapi.PeerInfo
|
||||
|
||||
proc writeLp(s: StreamTransport, msg: string | seq[byte]): Future[int] {.gcsafe.} =
|
||||
## write lenght prefixed
|
||||
var buf = initVBuffer()
|
||||
buf.writeSeq(msg)
|
||||
buf.finish()
|
||||
result = s.write(buf.buffer)
|
||||
|
||||
proc readLp(s: StreamTransport): Future[seq[byte]] {.async.} =
|
||||
## read length prefixed msg
|
||||
var
|
||||
size: uint
|
||||
length: int
|
||||
res: VarintResult[void]
|
||||
result = newSeq[byte](10)
|
||||
|
||||
for i in 0 ..< len(result):
|
||||
await s.readExactly(addr result[i], 1)
|
||||
res = LP.getUVarint(result.toOpenArray(0, i), length, size)
|
||||
if res.isOk():
|
||||
break
|
||||
res.expect("Valid varint")
|
||||
result.setLen(size)
|
||||
if size > 0.uint:
|
||||
await s.readExactly(addr result[0], int(size))
|
||||
|
||||
proc testPubSubDaemonPublish(
|
||||
gossip: bool = false, count: int = 1, swCreator: SwitchCreator
|
||||
) {.async.} =
|
||||
var pubsubData = "TEST MESSAGE"
|
||||
var testTopic = "test-topic"
|
||||
var msgData = pubsubData.toBytes()
|
||||
|
||||
var flags = {PSFloodSub}
|
||||
if gossip:
|
||||
flags = {PSGossipSub}
|
||||
|
||||
let daemonNode = await newDaemonApi(flags)
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
let nativeNode = swCreator()
|
||||
|
||||
let pubsub =
|
||||
if gossip:
|
||||
GossipSub.init(switch = nativeNode).PubSub
|
||||
else:
|
||||
FloodSub.init(switch = nativeNode).PubSub
|
||||
|
||||
nativeNode.mount(pubsub)
|
||||
|
||||
await nativeNode.start()
|
||||
await pubsub.start()
|
||||
let nativePeer = nativeNode.peerInfo
|
||||
|
||||
var finished = false
|
||||
var times = 0
|
||||
proc nativeHandler(topic: string, data: seq[byte]) {.async.} =
|
||||
let smsg = string.fromBytes(data)
|
||||
check smsg == pubsubData
|
||||
times.inc()
|
||||
if times >= count and not finished:
|
||||
finished = true
|
||||
|
||||
await nativeNode.connect(daemonPeer.peer, daemonPeer.addresses)
|
||||
|
||||
await sleepAsync(500.millis)
|
||||
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
||||
|
||||
proc pubsubHandler(
|
||||
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
|
||||
): Future[bool] {.async: (raises: [CatchableError]).} =
|
||||
result = true # don't cancel subscription
|
||||
|
||||
asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
|
||||
pubsub.subscribe(testTopic, nativeHandler)
|
||||
await sleepAsync(3.seconds)
|
||||
|
||||
proc publisher() {.async.} =
|
||||
while not finished:
|
||||
await daemonNode.pubsubPublish(testTopic, msgData)
|
||||
await sleepAsync(250.millis)
|
||||
|
||||
await wait(publisher(), 5.minutes) # should be plenty of time
|
||||
|
||||
await nativeNode.stop()
|
||||
await pubsub.stop()
|
||||
await daemonNode.close()
|
||||
|
||||
proc testPubSubNodePublish(
|
||||
gossip: bool = false, count: int = 1, swCreator: SwitchCreator
|
||||
) {.async.} =
|
||||
var pubsubData = "TEST MESSAGE"
|
||||
var testTopic = "test-topic"
|
||||
var msgData = pubsubData.toBytes()
|
||||
|
||||
var flags = {PSFloodSub}
|
||||
if gossip:
|
||||
flags = {PSGossipSub}
|
||||
|
||||
let daemonNode = await newDaemonApi(flags)
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
let nativeNode = swCreator()
|
||||
|
||||
let pubsub =
|
||||
if gossip:
|
||||
GossipSub.init(switch = nativeNode).PubSub
|
||||
else:
|
||||
FloodSub.init(switch = nativeNode).PubSub
|
||||
|
||||
nativeNode.mount(pubsub)
|
||||
|
||||
await nativeNode.start()
|
||||
await pubsub.start()
|
||||
let nativePeer = nativeNode.peerInfo
|
||||
|
||||
await nativeNode.connect(daemonPeer.peer, daemonPeer.addresses)
|
||||
|
||||
await sleepAsync(500.millis)
|
||||
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
||||
|
||||
var times = 0
|
||||
var finished = false
|
||||
proc pubsubHandler(
|
||||
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
|
||||
): Future[bool] {.async: (raises: [CatchableError]).} =
|
||||
let smsg = string.fromBytes(message.data)
|
||||
check smsg == pubsubData
|
||||
times.inc()
|
||||
if times >= count and not finished:
|
||||
finished = true
|
||||
result = true # don't cancel subscription
|
||||
|
||||
discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
|
||||
proc nativeHandler(topic: string, data: seq[byte]) {.async.} =
|
||||
discard
|
||||
|
||||
pubsub.subscribe(testTopic, nativeHandler)
|
||||
await sleepAsync(3.seconds)
|
||||
|
||||
proc publisher() {.async.} =
|
||||
while not finished:
|
||||
discard await pubsub.publish(testTopic, msgData)
|
||||
await sleepAsync(250.millis)
|
||||
|
||||
await wait(publisher(), 5.minutes) # should be plenty of time
|
||||
|
||||
check finished
|
||||
await nativeNode.stop()
|
||||
await pubsub.stop()
|
||||
await daemonNode.close()
|
||||
|
||||
proc commonInteropTests*(name: string, swCreator: SwitchCreator) =
|
||||
suite "Interop using " & name:
|
||||
# TODO: chronos transports are leaking,
|
||||
# but those are tracked for both the daemon
|
||||
# and libp2p, so not sure which one it is,
|
||||
# need to investigate more
|
||||
# teardown:
|
||||
# checkTrackers()
|
||||
|
||||
# TODO: this test is failing sometimes on windows
|
||||
# For some reason we receive EOF before test 4 sometimes
|
||||
|
||||
asyncTest "native -> daemon multiple reads and writes":
|
||||
var protos = @["/test-stream"]
|
||||
|
||||
let nativeNode = swCreator()
|
||||
|
||||
await nativeNode.start()
|
||||
let daemonNode = await newDaemonApi()
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
|
||||
var testFuture = newFuture[void]("test.future")
|
||||
proc daemonHandler(
|
||||
api: DaemonAPI, stream: P2PStream
|
||||
) {.async: (raises: [CatchableError]).} =
|
||||
check string.fromBytes(await stream.transp.readLp()) == "test 1"
|
||||
discard await stream.transp.writeLp("test 2")
|
||||
check string.fromBytes(await stream.transp.readLp()) == "test 3"
|
||||
discard await stream.transp.writeLp("test 4")
|
||||
testFuture.complete()
|
||||
|
||||
await daemonNode.addHandler(protos, daemonHandler)
|
||||
let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0])
|
||||
await conn.writeLp("test 1")
|
||||
check "test 2" == string.fromBytes((await conn.readLp(1024)))
|
||||
|
||||
await conn.writeLp("test 3")
|
||||
check "test 4" == string.fromBytes((await conn.readLp(1024)))
|
||||
|
||||
await wait(testFuture, 10.secs)
|
||||
|
||||
await nativeNode.stop()
|
||||
await daemonNode.close()
|
||||
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
asyncTest "native -> daemon connection":
|
||||
var protos = @["/test-stream"]
|
||||
var test = "TEST STRING"
|
||||
# We are preparing expect string, which should be prefixed with varint
|
||||
# length and do not have `\r\n` suffix, because we going to use
|
||||
# readLine().
|
||||
var buffer = initVBuffer()
|
||||
buffer.writeSeq(test & "\r\n")
|
||||
buffer.finish()
|
||||
var expect = newString(len(buffer) - 2)
|
||||
copyMem(addr expect[0], addr buffer.buffer[0], len(expect))
|
||||
|
||||
let nativeNode = swCreator()
|
||||
|
||||
await nativeNode.start()
|
||||
|
||||
let daemonNode = await newDaemonApi()
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
|
||||
var testFuture = newFuture[string]("test.future")
|
||||
proc daemonHandler(
|
||||
api: DaemonAPI, stream: P2PStream
|
||||
) {.async: (raises: [CatchableError]).} =
|
||||
# We should perform `readLp()` instead of `readLine()`. `readLine()`
|
||||
# here reads actually length prefixed string.
|
||||
var line = await stream.transp.readLine()
|
||||
check line == expect
|
||||
testFuture.complete(line)
|
||||
await stream.close()
|
||||
|
||||
await daemonNode.addHandler(protos, daemonHandler)
|
||||
let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0])
|
||||
await conn.writeLp(test & "\r\n")
|
||||
check expect == (await wait(testFuture, 10.secs))
|
||||
|
||||
await conn.close()
|
||||
await nativeNode.stop()
|
||||
await daemonNode.close()
|
||||
|
||||
asyncTest "daemon -> native connection":
|
||||
var protos = @["/test-stream"]
|
||||
var test = "TEST STRING"
|
||||
|
||||
var testFuture = newFuture[string]("test.future")
|
||||
proc nativeHandler(
|
||||
conn: Connection, proto: string
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
var line = string.fromBytes(await conn.readLp(1024))
|
||||
check line == test
|
||||
testFuture.complete(line)
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError:
|
||||
check false # should not be here
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
# custom proto
|
||||
var proto = new LPProtocol
|
||||
proto.handler = nativeHandler
|
||||
proto.codec = protos[0] # codec
|
||||
|
||||
let nativeNode = swCreator()
|
||||
|
||||
nativeNode.mount(proto)
|
||||
|
||||
await nativeNode.start()
|
||||
let nativePeer = nativeNode.peerInfo
|
||||
|
||||
let daemonNode = await newDaemonApi()
|
||||
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
||||
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
|
||||
discard await stream.transp.writeLp(test)
|
||||
|
||||
check test == (await wait(testFuture, 10.secs))
|
||||
|
||||
await stream.close()
|
||||
await nativeNode.stop()
|
||||
await daemonNode.close()
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
asyncTest "native -> daemon websocket connection":
|
||||
var protos = @["/test-stream"]
|
||||
var test = "TEST STRING"
|
||||
|
||||
var testFuture = newFuture[string]("test.future")
|
||||
proc nativeHandler(
|
||||
conn: Connection, proto: string
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
var line = string.fromBytes(await conn.readLp(1024))
|
||||
check line == test
|
||||
testFuture.complete(line)
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError:
|
||||
check false # should not be here
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
# custom proto
|
||||
var proto = new LPProtocol
|
||||
proto.handler = nativeHandler
|
||||
proto.codec = protos[0] # codec
|
||||
|
||||
let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet()
|
||||
|
||||
let nativeNode = swCreator(
|
||||
ma = wsAddress,
|
||||
prov = proc(config: TransportConfig): Transport =
|
||||
WsTransport.new(config.upgr),
|
||||
)
|
||||
|
||||
nativeNode.mount(proto)
|
||||
|
||||
await nativeNode.start()
|
||||
let nativePeer = nativeNode.peerInfo
|
||||
|
||||
let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress])
|
||||
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
||||
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
|
||||
discard await stream.transp.writeLp(test)
|
||||
|
||||
check test == (await wait(testFuture, 10.secs))
|
||||
|
||||
await stream.close()
|
||||
await nativeNode.stop()
|
||||
await daemonNode.close()
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
asyncTest "daemon -> native websocket connection":
|
||||
var protos = @["/test-stream"]
|
||||
var test = "TEST STRING"
|
||||
# We are preparing expect string, which should be prefixed with varint
|
||||
# length and do not have `\r\n` suffix, because we going to use
|
||||
# readLine().
|
||||
var buffer = initVBuffer()
|
||||
buffer.writeSeq(test & "\r\n")
|
||||
buffer.finish()
|
||||
var expect = newString(len(buffer) - 2)
|
||||
copyMem(addr expect[0], addr buffer.buffer[0], len(expect))
|
||||
|
||||
let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet()
|
||||
let nativeNode = SwitchBuilder
|
||||
.new()
|
||||
.withAddress(wsAddress)
|
||||
.withRng(crypto.newRng())
|
||||
.withMplex()
|
||||
.withWsTransport()
|
||||
.withNoise()
|
||||
.build()
|
||||
|
||||
await nativeNode.start()
|
||||
|
||||
let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress])
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
|
||||
var testFuture = newFuture[string]("test.future")
|
||||
proc daemonHandler(
|
||||
api: DaemonAPI, stream: P2PStream
|
||||
) {.async: (raises: [CatchableError]).} =
|
||||
# We should perform `readLp()` instead of `readLine()`. `readLine()`
|
||||
# here reads actually length prefixed string.
|
||||
var line = await stream.transp.readLine()
|
||||
check line == expect
|
||||
testFuture.complete(line)
|
||||
await stream.close()
|
||||
|
||||
await daemonNode.addHandler(protos, daemonHandler)
|
||||
let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0])
|
||||
await conn.writeLp(test & "\r\n")
|
||||
check expect == (await wait(testFuture, 10.secs))
|
||||
|
||||
await conn.close()
|
||||
await nativeNode.stop()
|
||||
await daemonNode.close()
|
||||
|
||||
asyncTest "daemon -> multiple reads and writes":
|
||||
var protos = @["/test-stream"]
|
||||
|
||||
var testFuture = newFuture[void]("test.future")
|
||||
proc nativeHandler(
|
||||
conn: Connection, proto: string
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
check "test 1" == string.fromBytes(await conn.readLp(1024))
|
||||
await conn.writeLp("test 2".toBytes())
|
||||
|
||||
check "test 3" == string.fromBytes(await conn.readLp(1024))
|
||||
await conn.writeLp("test 4".toBytes())
|
||||
|
||||
testFuture.complete()
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError:
|
||||
check false # should not be here
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
# custom proto
|
||||
var proto = new LPProtocol
|
||||
proto.handler = nativeHandler
|
||||
proto.codec = protos[0] # codec
|
||||
|
||||
let nativeNode = swCreator()
|
||||
|
||||
nativeNode.mount(proto)
|
||||
|
||||
await nativeNode.start()
|
||||
let nativePeer = nativeNode.peerInfo
|
||||
|
||||
let daemonNode = await newDaemonApi()
|
||||
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
||||
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
|
||||
|
||||
asyncDiscard stream.transp.writeLp("test 1")
|
||||
check "test 2" == string.fromBytes(await stream.transp.readLp())
|
||||
|
||||
asyncDiscard stream.transp.writeLp("test 3")
|
||||
check "test 4" == string.fromBytes(await stream.transp.readLp())
|
||||
|
||||
await wait(testFuture, 10.secs)
|
||||
|
||||
await stream.close()
|
||||
await nativeNode.stop()
|
||||
await daemonNode.close()
|
||||
|
||||
asyncTest "read write multiple":
|
||||
var protos = @["/test-stream"]
|
||||
var test = "TEST STRING"
|
||||
|
||||
var count = 0
|
||||
var testFuture = newFuture[int]("test.future")
|
||||
proc nativeHandler(
|
||||
conn: Connection, proto: string
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
while count < 10:
|
||||
var line = string.fromBytes(await conn.readLp(1024))
|
||||
check line == test
|
||||
await conn.writeLp(test.toBytes())
|
||||
count.inc()
|
||||
|
||||
testFuture.complete(count)
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError:
|
||||
check false # should not be here
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
# custom proto
|
||||
var proto = new LPProtocol
|
||||
proto.handler = nativeHandler
|
||||
proto.codec = protos[0] # codec
|
||||
|
||||
let nativeNode = swCreator()
|
||||
|
||||
nativeNode.mount(proto)
|
||||
|
||||
await nativeNode.start()
|
||||
let nativePeer = nativeNode.peerInfo
|
||||
|
||||
let daemonNode = await newDaemonApi()
|
||||
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
||||
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
|
||||
|
||||
var count2 = 0
|
||||
while count2 < 10:
|
||||
discard await stream.transp.writeLp(test)
|
||||
let line = await stream.transp.readLp()
|
||||
check test == string.fromBytes(line)
|
||||
inc(count2)
|
||||
|
||||
check 10 == (await wait(testFuture, 1.minutes))
|
||||
await stream.close()
|
||||
await nativeNode.stop()
|
||||
await daemonNode.close()
|
||||
|
||||
asyncTest "floodsub: daemon publish one":
|
||||
await testPubSubDaemonPublish(swCreator = swCreator)
|
||||
|
||||
asyncTest "floodsub: daemon publish many":
|
||||
await testPubSubDaemonPublish(count = 10, swCreator = swCreator)
|
||||
|
||||
asyncTest "gossipsub: daemon publish one":
|
||||
await testPubSubDaemonPublish(gossip = true, swCreator = swCreator)
|
||||
|
||||
asyncTest "gossipsub: daemon publish many":
|
||||
await testPubSubDaemonPublish(gossip = true, count = 10, swCreator = swCreator)
|
||||
|
||||
asyncTest "floodsub: node publish one":
|
||||
await testPubSubNodePublish(swCreator = swCreator)
|
||||
|
||||
asyncTest "floodsub: node publish many":
|
||||
await testPubSubNodePublish(count = 10, swCreator = swCreator)
|
||||
|
||||
asyncTest "gossipsub: node publish one":
|
||||
await testPubSubNodePublish(gossip = true, swCreator = swCreator)
|
||||
|
||||
asyncTest "gossipsub: node publish many":
|
||||
await testPubSubNodePublish(gossip = true, count = 10, swCreator = swCreator)
|
||||
|
||||
proc relayInteropTests*(name: string, relayCreator: SwitchCreator) =
|
||||
suite "Interop relay using " & name:
|
||||
asyncTest "NativeSrc -> NativeRelay -> DaemonDst":
|
||||
let closeBlocker = newFuture[void]()
|
||||
let daemonFinished = newFuture[void]()
|
||||
# TODO: This Future blocks the daemonHandler after sending the last message.
|
||||
# It exists because there's a strange behavior where stream.close sends
|
||||
# a Rst instead of Fin. We should investigate this at some point.
|
||||
proc daemonHandler(
|
||||
api: DaemonAPI, stream: P2PStream
|
||||
) {.async: (raises: [CatchableError]).} =
|
||||
check "line1" == string.fromBytes(await stream.transp.readLp())
|
||||
discard await stream.transp.writeLp("line2")
|
||||
check "line3" == string.fromBytes(await stream.transp.readLp())
|
||||
discard await stream.transp.writeLp("line4")
|
||||
await closeBlocker
|
||||
await stream.close()
|
||||
daemonFinished.complete()
|
||||
|
||||
let
|
||||
maSrc = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
maRel = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
src = relayCreator(maSrc, relay = RelayClient.new(circuitRelayV1 = true))
|
||||
rel = relayCreator(maRel)
|
||||
|
||||
await src.start()
|
||||
await rel.start()
|
||||
let daemonNode = await newDaemonApi()
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
let maStr =
|
||||
$rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit"
|
||||
let maddr = MultiAddress.init(maStr).tryGet()
|
||||
await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
|
||||
await rel.connect(daemonPeer.peer, daemonPeer.addresses)
|
||||
|
||||
await daemonNode.addHandler(@["/testCustom"], daemonHandler)
|
||||
|
||||
let conn = await src.dial(daemonPeer.peer, @[maddr], @["/testCustom"])
|
||||
|
||||
await conn.writeLp("line1")
|
||||
check string.fromBytes(await conn.readLp(1024)) == "line2"
|
||||
|
||||
await conn.writeLp("line3")
|
||||
check string.fromBytes(await conn.readLp(1024)) == "line4"
|
||||
|
||||
closeBlocker.complete()
|
||||
await daemonFinished
|
||||
await conn.close()
|
||||
await allFutures(src.stop(), rel.stop())
|
||||
try:
|
||||
await daemonNode.close()
|
||||
except CatchableError as e:
|
||||
when defined(windows):
|
||||
# On Windows, daemon close may fail due to socket race condition
|
||||
# This is expected behavior and can be safely ignored
|
||||
discard
|
||||
else:
|
||||
raise e
|
||||
|
||||
asyncTest "DaemonSrc -> NativeRelay -> NativeDst":
|
||||
proc customHandler(
|
||||
conn: Connection, proto: string
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
check "line1" == string.fromBytes(await conn.readLp(1024))
|
||||
await conn.writeLp("line2")
|
||||
check "line3" == string.fromBytes(await conn.readLp(1024))
|
||||
await conn.writeLp("line4")
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError:
|
||||
check false # should not be here
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
let protos = @["/customProto", RelayV1Codec]
|
||||
var customProto = new LPProtocol
|
||||
customProto.handler = customHandler
|
||||
customProto.codec = protos[0]
|
||||
let
|
||||
maRel = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
maDst = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
rel = relayCreator(maRel)
|
||||
dst = relayCreator(maDst, relay = RelayClient.new())
|
||||
|
||||
dst.mount(customProto)
|
||||
await rel.start()
|
||||
await dst.start()
|
||||
let daemonNode = await newDaemonApi()
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
let maStr =
|
||||
$rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit"
|
||||
let maddr = MultiAddress.init(maStr).tryGet()
|
||||
await daemonNode.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
|
||||
await rel.connect(dst.peerInfo.peerId, dst.peerInfo.addrs)
|
||||
await daemonNode.connect(dst.peerInfo.peerId, @[maddr])
|
||||
var stream = await daemonNode.openStream(dst.peerInfo.peerId, protos)
|
||||
|
||||
discard await stream.transp.writeLp("line1")
|
||||
check string.fromBytes(await stream.transp.readLp()) == "line2"
|
||||
discard await stream.transp.writeLp("line3")
|
||||
check string.fromBytes(await stream.transp.readLp()) == "line4"
|
||||
|
||||
await allFutures(dst.stop(), rel.stop())
|
||||
await daemonNode.close()
|
||||
|
||||
asyncTest "NativeSrc -> DaemonRelay -> NativeDst":
|
||||
proc customHandler(
|
||||
conn: Connection, proto: string
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
check "line1" == string.fromBytes(await conn.readLp(1024))
|
||||
await conn.writeLp("line2")
|
||||
check "line3" == string.fromBytes(await conn.readLp(1024))
|
||||
await conn.writeLp("line4")
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except CatchableError:
|
||||
check false # should not be here
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
let protos = @["/customProto", RelayV1Codec]
|
||||
var customProto = new LPProtocol
|
||||
customProto.handler = customHandler
|
||||
customProto.codec = protos[0]
|
||||
let
|
||||
maSrc = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
maDst = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
src = relayCreator(maSrc, relay = RelayClient.new())
|
||||
dst = relayCreator(maDst, relay = RelayClient.new())
|
||||
|
||||
dst.mount(customProto)
|
||||
await src.start()
|
||||
await dst.start()
|
||||
let daemonNode = await newDaemonApi({RelayHop})
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
let maStr = $daemonPeer.addresses[0] & "/p2p/" & $daemonPeer.peer & "/p2p-circuit"
|
||||
let maddr = MultiAddress.init(maStr).tryGet()
|
||||
await src.connect(daemonPeer.peer, daemonPeer.addresses)
|
||||
await daemonNode.connect(dst.peerInfo.peerId, dst.peerInfo.addrs)
|
||||
let conn = await src.dial(dst.peerInfo.peerId, @[maddr], protos[0])
|
||||
|
||||
await conn.writeLp("line1")
|
||||
check string.fromBytes(await conn.readLp(1024)) == "line2"
|
||||
|
||||
await conn.writeLp("line3")
|
||||
check string.fromBytes(await conn.readLp(1024)) == "line4"
|
||||
|
||||
await allFutures(src.stop(), dst.stop())
|
||||
await daemonNode.close()
|
||||
@@ -1,3 +1,3 @@
|
||||
{.used.}
|
||||
|
||||
import testnative, ./pubsub/testpubsub, testinterop, testdaemon
|
||||
import testnative, ./pubsub/testpubsub
|
||||
|
||||
@@ -1,123 +0,0 @@
|
||||
import chronos, unittest2, helpers
|
||||
import
|
||||
../libp2p/daemon/daemonapi,
|
||||
../libp2p/multiaddress,
|
||||
../libp2p/multicodec,
|
||||
../libp2p/cid,
|
||||
../libp2p/multihash,
|
||||
../libp2p/peerid
|
||||
|
||||
when defined(nimHasUsed):
|
||||
{.used.}
|
||||
|
||||
proc identitySpawnTest(): Future[bool] {.async.} =
|
||||
var api = await newDaemonApi()
|
||||
var data = await api.identity()
|
||||
await api.close()
|
||||
result = true
|
||||
|
||||
proc connectStreamTest(): Future[bool] {.async.} =
|
||||
var api1 = await newDaemonApi()
|
||||
var api2 = await newDaemonApi()
|
||||
|
||||
var id1 = await api1.identity()
|
||||
var id2 = await api2.identity()
|
||||
|
||||
var protos = @["/test-stream"]
|
||||
var test = "TEST STRING"
|
||||
|
||||
var testFuture = newFuture[string]("test.future")
|
||||
|
||||
proc streamHandler(
|
||||
api: DaemonAPI, stream: P2PStream
|
||||
) {.async: (raises: [CatchableError]).} =
|
||||
var line = await stream.transp.readLine()
|
||||
testFuture.complete(line)
|
||||
|
||||
await api2.addHandler(protos, streamHandler)
|
||||
await api1.connect(id2.peer, id2.addresses)
|
||||
# echo await api1.listPeers()
|
||||
var stream = await api1.openStream(id2.peer, protos)
|
||||
let sent = await stream.transp.write(test & "\r\n")
|
||||
doAssert(sent == len(test) + 2)
|
||||
doAssert((await wait(testFuture, 10.seconds)) == test)
|
||||
await stream.close()
|
||||
await api1.close()
|
||||
await api2.close()
|
||||
result = true
|
||||
|
||||
proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
|
||||
var pubsubData = "TEST MESSAGE"
|
||||
var msgData = cast[seq[byte]](pubsubData)
|
||||
var api1, api2: DaemonAPI
|
||||
|
||||
api1 = await newDaemonApi(f)
|
||||
api2 = await newDaemonApi(f)
|
||||
|
||||
var id1 = await api1.identity()
|
||||
var id2 = await api2.identity()
|
||||
|
||||
var resultsCount = 0
|
||||
|
||||
var handlerFuture1 = newFuture[void]()
|
||||
var handlerFuture2 = newFuture[void]()
|
||||
|
||||
proc pubsubHandler1(
|
||||
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
|
||||
): Future[bool] {.async: (raises: [CatchableError]).} =
|
||||
let smsg = cast[string](message.data)
|
||||
if smsg == pubsubData:
|
||||
inc(resultsCount)
|
||||
handlerFuture1.complete()
|
||||
# Callback must return `false` to close subscription channel.
|
||||
result = false
|
||||
|
||||
proc pubsubHandler2(
|
||||
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
|
||||
): Future[bool] {.async: (raises: [CatchableError]).} =
|
||||
let smsg = cast[string](message.data)
|
||||
if smsg == pubsubData:
|
||||
inc(resultsCount)
|
||||
handlerFuture2.complete()
|
||||
# Callback must return `false` to close subscription channel.
|
||||
result = false
|
||||
|
||||
await api1.connect(id2.peer, id2.addresses)
|
||||
await api2.connect(id1.peer, id1.addresses)
|
||||
|
||||
var ticket1 = await api1.pubsubSubscribe("test-topic", pubsubHandler1)
|
||||
var ticket2 = await api2.pubsubSubscribe("test-topic", pubsubHandler2)
|
||||
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
var topics1 = await api1.pubsubGetTopics()
|
||||
var topics2 = await api2.pubsubGetTopics()
|
||||
|
||||
if len(topics1) == 1 and len(topics2) == 1:
|
||||
var peers1 = await api1.pubsubListPeers("test-topic")
|
||||
var peers2 = await api2.pubsubListPeers("test-topic")
|
||||
if len(peers1) == 1 and len(peers2) == 1:
|
||||
# Publish test data via api1.
|
||||
await sleepAsync(250.milliseconds)
|
||||
await api1.pubsubPublish("test-topic", msgData)
|
||||
var res =
|
||||
await one(allFutures(handlerFuture1, handlerFuture2), sleepAsync(5.seconds))
|
||||
|
||||
await api1.close()
|
||||
await api2.close()
|
||||
if resultsCount == 2:
|
||||
result = true
|
||||
|
||||
suite "libp2p-daemon test suite":
|
||||
test "Simple spawn and get identity test":
|
||||
check:
|
||||
waitFor(identitySpawnTest()) == true
|
||||
test "Connect/Accept peer/stream test":
|
||||
check:
|
||||
waitFor(connectStreamTest()) == true
|
||||
asyncTest "GossipSub test":
|
||||
checkUntilTimeoutCustom(10.seconds, 100.milliseconds):
|
||||
(await pubsubTest({PSGossipSub}))
|
||||
asyncTest "FloodSub test":
|
||||
checkUntilTimeoutCustom(10.seconds, 100.milliseconds):
|
||||
(await pubsubTest({PSFloodSub}))
|
||||
@@ -1,58 +0,0 @@
|
||||
{.used.}
|
||||
|
||||
import helpers, commoninterop
|
||||
import ../libp2p
|
||||
import ../libp2p/crypto/crypto, ../libp2p/protocols/connectivity/relay/relay
|
||||
|
||||
proc switchMplexCreator(
|
||||
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
|
||||
prov = proc(config: TransportConfig): Transport =
|
||||
TcpTransport.new({}, config.upgr),
|
||||
relay: Relay = Relay.new(circuitRelayV1 = true),
|
||||
): Switch {.raises: [LPError].} =
|
||||
SwitchBuilder
|
||||
.new()
|
||||
.withSignedPeerRecord(false)
|
||||
.withMaxConnections(MaxConnections)
|
||||
.withRng(crypto.newRng())
|
||||
.withAddresses(@[ma])
|
||||
.withMaxIn(-1)
|
||||
.withMaxOut(-1)
|
||||
.withTransport(prov)
|
||||
.withMplex()
|
||||
.withMaxConnsPerPeer(MaxConnectionsPerPeer)
|
||||
.withPeerStore(capacity = 1000)
|
||||
.withNoise()
|
||||
.withCircuitRelay(relay)
|
||||
.withNameResolver(nil)
|
||||
.build()
|
||||
|
||||
proc switchYamuxCreator(
|
||||
ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
|
||||
prov = proc(config: TransportConfig): Transport =
|
||||
TcpTransport.new({}, config.upgr),
|
||||
relay: Relay = Relay.new(circuitRelayV1 = true),
|
||||
): Switch {.raises: [LPError].} =
|
||||
SwitchBuilder
|
||||
.new()
|
||||
.withSignedPeerRecord(false)
|
||||
.withMaxConnections(MaxConnections)
|
||||
.withRng(crypto.newRng())
|
||||
.withAddresses(@[ma])
|
||||
.withMaxIn(-1)
|
||||
.withMaxOut(-1)
|
||||
.withTransport(prov)
|
||||
.withYamux()
|
||||
.withMaxConnsPerPeer(MaxConnectionsPerPeer)
|
||||
.withPeerStore(capacity = 1000)
|
||||
.withNoise()
|
||||
.withCircuitRelay(relay)
|
||||
.withNameResolver(nil)
|
||||
.build()
|
||||
|
||||
suite "Tests interop":
|
||||
commonInteropTests("mplex", switchMplexCreator)
|
||||
relayInteropTests("mplex", switchMplexCreator)
|
||||
|
||||
commonInteropTests("yamux", switchYamuxCreator)
|
||||
relayInteropTests("yamux", switchYamuxCreator)
|
||||
@@ -123,7 +123,7 @@ const
|
||||
"/ip4/127.0.0.1/ipfs", "/ip4/127.0.0.1/ipfs/tcp", "/p2p-circuit/50",
|
||||
]
|
||||
|
||||
PathVectors = ["/unix/tmp/p2pd.sock", "/unix/a/b/c/d/e/f/g/h/i.sock"]
|
||||
PathVectors = ["/unix/a/b/c/d/e/f/g/h/i.sock"]
|
||||
|
||||
PathExpects = [
|
||||
"90030E2F746D702F703270642E736F636B",
|
||||
|
||||
@@ -28,7 +28,6 @@ import
|
||||
builders,
|
||||
upgrademngrs/upgrade,
|
||||
varint,
|
||||
daemon/daemonapi,
|
||||
]
|
||||
import ./helpers
|
||||
|
||||
|
||||
Reference in New Issue
Block a user