diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ad2cb0752..b66eeb6ec 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -72,15 +72,6 @@ jobs: shell: ${{ matrix.shell }} nim_ref: ${{ matrix.nim.ref }} - - name: Setup Go - uses: actions/setup-go@v5 - with: - go-version: '~1.16.0' # That's the minimum Go version that works with arm. - - - name: Install p2pd - run: | - V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3 - - name: Restore deps from cache id: deps-cache uses: actions/cache@v3 diff --git a/.github/workflows/daily_common.yml b/.github/workflows/daily_common.yml index 9ab6af86f..4f24e672a 100644 --- a/.github/workflows/daily_common.yml +++ b/.github/workflows/daily_common.yml @@ -69,16 +69,6 @@ jobs: nim_ref: ${{ matrix.nim.ref }} cpu: ${{ matrix.cpu }} - - name: Setup Go - uses: actions/setup-go@v5 - with: - go-version: '~1.16.0' - cache: false - - - name: Install p2pd - run: | - V=1 bash scripts/build_p2pd.sh p2pdCache 124530a3 - - name: Install dependencies (pinned) if: ${{ inputs.pinned_deps }} run: | diff --git a/README.md b/README.md index 7401ab6a1..d4f8361e5 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ nimble install libp2p You'll find the nim-libp2p documentation [here](https://vacp2p.github.io/nim-libp2p/docs/). See [examples](./examples) for simple usage patterns. ## Getting Started -Try out the chat example. For this you'll need to have [`go-libp2p-daemon`](examples/go-daemon/daemonapi.md) running. Full code can be found [here](https://github.com/status-im/nim-libp2p/blob/master/examples/chat.nim): +Try out the chat example. Full code can be found [here](https://github.com/status-im/nim-libp2p/blob/master/examples/chat.nim): ```bash nim c -r --threads:on examples/directchat.nim @@ -81,12 +81,6 @@ Run unit tests: # run all the unit tests nimble test ``` -**Obs:** Running all tests requires the [`go-libp2p-daemon` to be installed and running](examples/go-daemon/daemonapi.md). - -If you only want to run tests that don't require `go-libp2p-daemon`, use: -``` -nimble testnative -``` For a list of all available test suites, use: ``` @@ -155,8 +149,6 @@ List of packages modules implemented in nim-libp2p: | [connmanager](libp2p/connmanager.nim) | Connection manager | | [identify / push identify](libp2p/protocols/identify.nim) | [Identify](https://docs.libp2p.io/concepts/fundamentals/protocols/#identify) protocol | | [ping](libp2p/protocols/ping.nim) | [Ping](https://docs.libp2p.io/concepts/fundamentals/protocols/#ping) protocol | -| [libp2p-daemon-client](libp2p/daemon/daemonapi.nim) | [go-daemon](https://github.com/libp2p/go-libp2p-daemon) nim wrapper | -| [interop-libp2p](tests/testinterop.nim) | Interop tests | | **Transports** | | | [libp2p-tcp](libp2p/transports/tcptransport.nim) | TCP transport | | [libp2p-ws](libp2p/transports/wstransport.nim) | WebSocket & WebSocket Secure transport | diff --git a/examples/go-daemon/bootstrap.nim b/examples/go-daemon/bootstrap.nim deleted file mode 100644 index 1d0ff97ed..000000000 --- a/examples/go-daemon/bootstrap.nim +++ /dev/null @@ -1,54 +0,0 @@ -import chronos, nimcrypto, strutils -import ../../libp2p/daemon/daemonapi -import ../hexdump - -const PubSubTopic = "test-net" - -proc dumpSubscribedPeers(api: DaemonAPI) {.async.} = - var peers = await api.pubsubListPeers(PubSubTopic) - echo "= List of connected and subscribed peers:" - for item in peers: - echo item.pretty() - -proc dumpAllPeers(api: DaemonAPI) {.async.} = - var peers = await api.listPeers() - echo "Current connected peers count = ", len(peers) - for item in peers: - echo item.peer.pretty() - -proc monitor(api: DaemonAPI) {.async.} = - while true: - echo "Dumping all peers" - await dumpAllPeers(api) - await sleepAsync(5000) - -proc main() {.async.} = - echo "= Starting P2P bootnode" - var api = await newDaemonApi({DHTFull, PSGossipSub}) - var id = await api.identity() - echo "= P2P bootnode ", id.peer.pretty(), " started." - let mcip4 = multiCodec("ip4") - let mcip6 = multiCodec("ip6") - echo "= You can use one of this addresses to bootstrap your nodes:" - for item in id.addresses: - if item.protoCode() == mcip4 or item.protoCode() == mcip6: - echo $item & "/ipfs/" & id.peer.pretty() - - asyncSpawn monitor(api) - - proc pubsubLogger( - api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage - ): Future[bool] {.async.} = - let msglen = len(message.data) - echo "= Recieved pubsub message with length ", - msglen, " bytes from peer ", message.peer.pretty() - echo dumpHex(message.data) - await api.dumpSubscribedPeers() - result = true - - var ticket = await api.pubsubSubscribe(PubSubTopic, pubsubLogger) - -when isMainModule: - waitFor(main()) - while true: - poll() diff --git a/examples/go-daemon/chat.nim b/examples/go-daemon/chat.nim deleted file mode 100644 index fb1a20e32..000000000 --- a/examples/go-daemon/chat.nim +++ /dev/null @@ -1,132 +0,0 @@ -import chronos, nimcrypto, strutils -import ../../libp2p/daemon/daemonapi - -## nim c -r --threads:on chat.nim -when not (compileOption("threads")): - {.fatal: "Please, compile this program with the --threads:on option!".} - -const ServerProtocols = @["/test-chat-stream"] - -type CustomData = ref object - api: DaemonAPI - remotes: seq[StreamTransport] - consoleFd: AsyncFD - serveFut: Future[void] - -proc threadMain(wfd: AsyncFD) {.thread.} = - ## This procedure performs reading from `stdin` and sends data over - ## pipe to main thread. - var transp = fromPipe(wfd) - - while true: - var line = stdin.readLine() - let res = waitFor transp.write(line & "\r\n") - -proc serveThread(udata: CustomData) {.async.} = - ## This procedure perform reading on pipe and sends data to remote clients. - var transp = fromPipe(udata.consoleFd) - - proc remoteReader(transp: StreamTransport) {.async.} = - while true: - var line = await transp.readLine() - if len(line) == 0: - break - echo ">> ", line - - while true: - try: - var line = await transp.readLine() - if line.startsWith("/connect"): - var parts = line.split(" ") - if len(parts) == 2: - var peerId = PeerId.init(parts[1]) - var address = MultiAddress.init(multiCodec("p2p-circuit")) - address &= MultiAddress.init(multiCodec("p2p"), peerId) - echo "= Searching for peer ", peerId.pretty() - var id = await udata.api.dhtFindPeer(peerId) - echo "= Peer " & parts[1] & " found at addresses:" - for item in id.addresses: - echo $item - echo "= Connecting to peer ", $address - await udata.api.connect(peerId, @[address], 30) - echo "= Opening stream to peer chat ", parts[1] - var stream = await udata.api.openStream(peerId, ServerProtocols) - udata.remotes.add(stream.transp) - echo "= Connected to peer chat ", parts[1] - asyncSpawn remoteReader(stream.transp) - elif line.startsWith("/search"): - var parts = line.split(" ") - if len(parts) == 2: - var peerId = PeerId.init(parts[1]) - echo "= Searching for peer ", peerId.pretty() - var id = await udata.api.dhtFindPeer(peerId) - echo "= Peer " & parts[1] & " found at addresses:" - for item in id.addresses: - echo $item - elif line.startsWith("/consearch"): - var parts = line.split(" ") - if len(parts) == 2: - var peerId = PeerId.init(parts[1]) - echo "= Searching for peers connected to peer ", parts[1] - var peers = await udata.api.dhtFindPeersConnectedToPeer(peerId) - echo "= Found ", len(peers), " connected to peer ", parts[1] - for item in peers: - var peer = item.peer - var addresses = newSeq[string]() - var relay = false - for a in item.addresses: - addresses.add($a) - if a.protoName() == "/p2p-circuit": - relay = true - break - if relay: - echo peer.pretty(), " * ", " [", addresses.join(", "), "]" - else: - echo peer.pretty(), " [", addresses.join(", "), "]" - elif line.startsWith("/exit"): - break - else: - var msg = line & "\r\n" - echo "<< ", line - var pending = newSeq[Future[int]]() - for item in udata.remotes: - pending.add(item.write(msg)) - if len(pending) > 0: - var results = await all(pending) - except CatchableError as err: - echo err.msg - -proc main() {.async.} = - var data = new CustomData - data.remotes = newSeq[StreamTransport]() - - var (rfd, wfd) = createAsyncPipe() - if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe: - raise newException(ValueError, "Could not initialize pipe!") - - data.consoleFd = rfd - - data.serveFut = serveThread(data) - var thread: Thread[AsyncFD] - thread.createThread(threadMain, wfd) - - echo "= Starting P2P node" - data.api = await newDaemonApi({DHTFull, Bootstrap}) - await sleepAsync(3000) - var id = await data.api.identity() - - proc streamHandler(api: DaemonAPI, stream: P2PStream) {.async.} = - echo "= Peer ", stream.peer.pretty(), " joined chat" - data.remotes.add(stream.transp) - while true: - var line = await stream.transp.readLine() - if len(line) == 0: - break - echo ">> ", line - - await data.api.addHandler(ServerProtocols, streamHandler) - echo "= Your PeerId is ", id.peer.pretty() - await data.serveFut - -when isMainModule: - waitFor(main()) diff --git a/examples/go-daemon/daemonapi.md b/examples/go-daemon/daemonapi.md deleted file mode 100644 index 8c1e5fcc3..000000000 --- a/examples/go-daemon/daemonapi.md +++ /dev/null @@ -1,43 +0,0 @@ -# Table of Contents -- [Introduction](#introduction) -- [Prerequisites](#prerequisites) -- [Installation](#installation) - - [Script](#script) -- [Examples](#examples) - -# Introduction -This is a libp2p-backed daemon wrapping the functionalities of go-libp2p for use in Nim.
-For more information about the go daemon, check out [this repository](https://github.com/libp2p/go-libp2p-daemon). -> **Required only** for running the tests. - -# Prerequisites -Go with version `1.16.0` -> You will *likely* be able to build `go-libp2p-daemon` with different Go versions, but **they haven't been tested**. - -# Installation -Run the build script while having the `go` command pointing to the correct Go version. -```sh -./scripts/build_p2pd.sh -``` -`build_p2pd.sh` will not rebuild unless needed. If you already have the newest binary and you want to force the rebuild, use: -```sh -./scripts/build_p2pd.sh -f -``` -Or: -```sh -./scripts/build_p2pd.sh --force -``` - -If everything goes correctly, the binary (`p2pd`) should be built and placed in the `$GOPATH/bin` directory. -If you're having issues, head into [our discord](https://discord.com/channels/864066763682218004/1115526869769535629) and ask for assistance. - -After successfully building the binary, remember to add it to your path so it can be found. You can do that by running: -```sh -export PATH="$PATH:$HOME/go/bin" -``` -> **Tip:** To make this change permanent, add the command above to your `.bashrc` file. - -# Examples -Examples can be found in the [examples folder](https://github.com/status-im/nim-libp2p/tree/readme/examples/go-daemon) - - diff --git a/examples/go-daemon/node.nim b/examples/go-daemon/node.nim deleted file mode 100644 index 4f44fe336..000000000 --- a/examples/go-daemon/node.nim +++ /dev/null @@ -1,46 +0,0 @@ -import chronos, nimcrypto, strutils, os -import ../../libp2p/daemon/daemonapi - -const PubSubTopic = "test-net" - -proc main(bn: string) {.async.} = - echo "= Starting P2P node" - var bootnodes = bn.split(",") - var api = await newDaemonApi( - {DHTFull, PSGossipSub, WaitBootstrap}, bootstrapNodes = bootnodes, peersRequired = 1 - ) - var id = await api.identity() - echo "= P2P node ", id.peer.pretty(), " started:" - for item in id.addresses: - echo item - - proc pubsubLogger( - api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage - ): Future[bool] {.async.} = - let msglen = len(message.data) - echo "= Recieved pubsub message with length ", - msglen, " bytes from peer ", message.peer.pretty(), ": " - var strdata = cast[string](message.data) - echo strdata - result = true - - var ticket = await api.pubsubSubscribe(PubSubTopic, pubsubLogger) - - # Waiting for gossipsub interval - while true: - var peers = await api.pubsubListPeers(PubSubTopic) - if len(peers) > 0: - break - await sleepAsync(1000) - - var data = "HELLO\r\n" - var msgData = cast[seq[byte]](data) - await api.pubsubPublish(PubSubTopic, msgData) - -when isMainModule: - if paramCount() != 1: - echo "Please supply bootnodes!" - else: - waitFor(main(paramStr(1))) - while true: - poll() diff --git a/libp2p.nimble b/libp2p.nimble index 08dffa14e..b2631a591 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -49,12 +49,6 @@ proc tutorialToMd(filename: string) = task testnative, "Runs libp2p native tests": runTest("testnative") -task testdaemon, "Runs daemon tests": - runTest("testdaemon") - -task testinterop, "Runs interop tests": - runTest("testinterop") - task testpubsub, "Runs pubsub tests": runTest("pubsub/testpubsub", "-d:libp2p_gossipsub_1_4") diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim deleted file mode 100644 index da2b1da12..000000000 --- a/libp2p/daemon/daemonapi.nim +++ /dev/null @@ -1,1545 +0,0 @@ -# Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -# * MIT license ([LICENSE-MIT](LICENSE-MIT)) -# at your option. -# This file may not be copied, modified, or distributed except according to -# those terms. - -{.push raises: [].} - -## This module implementes API for `go-libp2p-daemon`. -import std/[os, osproc, strutils, tables, strtabs, sequtils] -import pkg/[chronos, chronicles] -import ../varint, ../multiaddress, ../multicodec, ../cid, ../peerid -import ../wire, ../multihash, ../protobuf/minprotobuf, ../errors -import ../crypto/crypto, ../utility -import ../utils/sequninit - -export peerid, multiaddress, multicodec, multihash, cid, crypto, wire, errors - -when not defined(windows): - import posix - -const - DefaultSocketPath* = "/unix/tmp/p2pd.sock" - DefaultUnixSocketPattern* = "/unix/tmp/nim-p2pd-$1-$2.sock" - DefaultIpSocketPattern* = "/ip4/127.0.0.1/tcp/$2" - DefaultUnixChildPattern* = "/unix/tmp/nim-p2pd-handle-$1-$2.sock" - DefaultIpChildPattern* = "/ip4/127.0.0.1/tcp/$2" - DefaultDaemonFile* = "p2pd" - -type - IpfsLogLevel* {.pure.} = enum - Critical - Error - Warning - Notice - Info - Debug - Trace - - RequestType* {.pure.} = enum - IDENTIFY = 0 - CONNECT = 1 - STREAM_OPEN = 2 - STREAM_HANDLER = 3 - DHT = 4 - LIST_PEERS = 5 - CONNMANAGER = 6 - DISCONNECT = 7 - PUBSUB = 8 - - DHTRequestType* {.pure.} = enum - FIND_PEER = 0 - FIND_PEERS_CONNECTED_TO_PEER = 1 - FIND_PROVIDERS = 2 - GET_CLOSEST_PEERS = 3 - GET_PUBLIC_KEY = 4 - GET_VALUE = 5 - SEARCH_VALUE = 6 - PUT_VALUE = 7 - PROVIDE = 8 - - ConnManagerRequestType* {.pure.} = enum - TAG_PEER = 0 - UNTAG_PEER = 1 - TRIM = 2 - - PSRequestType* {.pure.} = enum - GET_TOPICS = 0 - LIST_PEERS = 1 - PUBLISH = 2 - SUBSCRIBE = 3 - - ResponseKind* = enum - Malformed - Error - Success - - ResponseType* {.pure.} = enum - ERROR = 2 - STREAMINFO = 3 - IDENTITY = 4 - DHT = 5 - PEERINFO = 6 - PUBSUB = 7 - - DHTResponseType* {.pure.} = enum - BEGIN = 0 - VALUE = 1 - END = 2 - - MultiProtocol* = string - DHTValue* = seq[byte] - - P2PStreamFlags* {.pure.} = enum - None - Closed - Inbound - Outbound - - P2PDaemonFlags* = enum - DHTClient ## Start daemon in DHT client mode - DHTFull ## Start daemon with full DHT support - Bootstrap ## Start daemon with bootstrap - WaitBootstrap - ## Start daemon with bootstrap and wait until daemon - ## establish connection to at least 2 peers - PSFloodSub ## Enable `FloodSub` protocol in daemon - PSGossipSub ## Enable `GossipSub` protocol in daemon - PSNoSign ## Disable pubsub message signing (default true) - PSStrictSign ## Force strict checking pubsub message signature - NATPortMap ## Force daemon to use NAT-PMP. - AutoNAT ## Force daemon to use AutoNAT. - AutoRelay ## Enables autorelay mode. - RelayActive ## Enables active mode for relay. - RelayDiscovery ## Enables passive discovery for relay. - RelayHop ## Enables hop for relay. - NoInlinePeerId ## Disable inlining of peer ID (not yet in #master). - NoProcessCtrl ## Process was not spawned. - - P2PStream* = ref object - flags*: set[P2PStreamFlags] - peer*: PeerId - raddress*: MultiAddress - protocol*: string - transp*: StreamTransport - - P2PServer = object - server*: StreamServer - address*: MultiAddress - - DaemonAPI* = ref object # pool*: TransportPool - flags*: set[P2PDaemonFlags] - address*: MultiAddress - pattern*: string - ucounter*: int - process*: Process - handlers*: Table[string, P2PStreamCallback] - servers*: seq[P2PServer] - userData*: RootRef - - PeerInfo* = object - peer*: PeerId - addresses*: seq[MultiAddress] - - PubsubTicket* = ref object - topic*: string - handler*: P2PPubSubCallback2 - transp*: StreamTransport - - PubSubMessage* = object - peer*: PeerId - data*: seq[byte] - seqno*: seq[byte] - topics*: seq[string] - signature*: Signature - key*: PublicKey - - P2PStreamCallback* = proc(api: DaemonAPI, stream: P2PStream): Future[void] {. - gcsafe, async: (raises: [CatchableError]) - .} - P2PPubSubCallback* = proc( - api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage - ): Future[bool] {.gcsafe, raises: [CatchableError].} - P2PPubSubCallback2* = proc( - api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage - ): Future[bool] {.async: (raises: [CatchableError]).} - DaemonError* = object of LPError - DaemonRemoteError* = object of DaemonError - DaemonLocalError* = object of DaemonError - -var daemonsCount {.threadvar.}: int - -chronicles.formatIt(PeerInfo): - shortLog(it) - -proc requestIdentity(): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go - ## Processing function `doIdentify(req *pb.Request)`. - result = initProtoBuffer({WithVarintLength}) - result.write(1, safeConvert[uint](RequestType.IDENTIFY)) - result.finish() - -proc requestConnect( - peerid: PeerId, addresses: openArray[MultiAddress], timeout = 0 -): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go - ## Processing function `doConnect(req *pb.Request)`. - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, peerid) - for item in addresses: - msg.write(2, item.data.buffer) - if timeout > 0: - msg.write(3, hint64(timeout)) - result.write(1, safeConvert[uint](RequestType.CONNECT)) - result.write(2, msg) - result.finish() - -proc requestDisconnect(peerid: PeerId): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go - ## Processing function `doDisconnect(req *pb.Request)`. - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, peerid) - result.write(1, safeConvert[uint](RequestType.DISCONNECT)) - result.write(7, msg) - result.finish() - -proc requestStreamOpen( - peerid: PeerId, protocols: openArray[string], timeout = 0 -): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go - ## Processing function `doStreamOpen(req *pb.Request)`. - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, peerid) - for item in protocols: - msg.write(2, item) - if timeout > 0: - msg.write(3, hint64(timeout)) - result.write(1, safeConvert[uint](RequestType.STREAM_OPEN)) - result.write(3, msg) - result.finish() - -proc requestStreamHandler( - address: MultiAddress, protocols: openArray[MultiProtocol] -): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go - ## Processing function `doStreamHandler(req *pb.Request)`. - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, address.data.buffer) - for item in protocols: - msg.write(2, item) - result.write(1, safeConvert[uint](RequestType.STREAM_HANDLER)) - result.write(4, msg) - result.finish() - -proc requestListPeers(): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go - ## Processing function `doListPeers(req *pb.Request)` - result = initProtoBuffer({WithVarintLength}) - result.write(1, safeConvert[uint](RequestType.LIST_PEERS)) - result.finish() - -proc requestDHTFindPeer(peer: PeerId, timeout = 0): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go - ## Processing function `doDHTFindPeer(req *pb.DHTRequest)`. - let msgid = safeConvert[uint](DHTRequestType.FIND_PEER) - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, msgid) - msg.write(2, peer) - if timeout > 0: - msg.write(7, hint64(timeout)) - msg.finish() - result.write(1, safeConvert[uint](RequestType.DHT)) - result.write(5, msg) - result.finish() - -proc requestDHTFindPeersConnectedToPeer(peer: PeerId, timeout = 0): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go - ## Processing function `doDHTFindPeersConnectedToPeer(req *pb.DHTRequest)`. - let msgid = safeConvert[uint](DHTRequestType.FIND_PEERS_CONNECTED_TO_PEER) - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, msgid) - msg.write(2, peer) - if timeout > 0: - msg.write(7, hint64(timeout)) - msg.finish() - result.write(1, safeConvert[uint](RequestType.DHT)) - result.write(5, msg) - result.finish() - -proc requestDHTFindProviders(cid: Cid, count: uint32, timeout = 0): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go - ## Processing function `doDHTFindProviders(req *pb.DHTRequest)`. - let msgid = safeConvert[uint](DHTRequestType.FIND_PROVIDERS) - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, msgid) - msg.write(3, cid.data.buffer) - msg.write(6, count) - if timeout > 0: - msg.write(7, hint64(timeout)) - msg.finish() - result.write(1, safeConvert[uint](RequestType.DHT)) - result.write(5, msg) - result.finish() - -proc requestDHTGetClosestPeers(key: string, timeout = 0): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go - ## Processing function `doDHTGetClosestPeers(req *pb.DHTRequest)`. - let msgid = safeConvert[uint](DHTRequestType.GET_CLOSEST_PEERS) - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, msgid) - msg.write(4, key) - if timeout > 0: - msg.write(7, hint64(timeout)) - msg.finish() - result.write(1, safeConvert[uint](RequestType.DHT)) - result.write(5, msg) - result.finish() - -proc requestDHTGetPublicKey(peer: PeerId, timeout = 0): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go - ## Processing function `doDHTGetPublicKey(req *pb.DHTRequest)`. - let msgid = safeConvert[uint](DHTRequestType.GET_PUBLIC_KEY) - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, msgid) - msg.write(2, peer) - if timeout > 0: - msg.write(7, hint64(timeout)) - msg.finish() - result.write(1, safeConvert[uint](RequestType.DHT)) - result.write(5, msg) - result.finish() - -proc requestDHTGetValue(key: string, timeout = 0): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go - ## Processing function `doDHTGetValue(req *pb.DHTRequest)`. - let msgid = safeConvert[uint](DHTRequestType.GET_VALUE) - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, msgid) - msg.write(4, key) - if timeout > 0: - msg.write(7, hint64(timeout)) - msg.finish() - result.write(1, safeConvert[uint](RequestType.DHT)) - result.write(5, msg) - result.finish() - -proc requestDHTSearchValue(key: string, timeout = 0): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go - ## Processing function `doDHTSearchValue(req *pb.DHTRequest)`. - let msgid = safeConvert[uint](DHTRequestType.SEARCH_VALUE) - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, msgid) - msg.write(4, key) - if timeout > 0: - msg.write(7, hint64(timeout)) - msg.finish() - result.write(1, safeConvert[uint](RequestType.DHT)) - result.write(5, msg) - result.finish() - -proc requestDHTPutValue(key: string, value: openArray[byte], timeout = 0): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go - ## Processing function `doDHTPutValue(req *pb.DHTRequest)`. - let msgid = safeConvert[uint](DHTRequestType.PUT_VALUE) - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, msgid) - msg.write(4, key) - msg.write(5, value) - if timeout > 0: - msg.write(7, hint64(timeout)) - msg.finish() - result.write(1, uint(RequestType.DHT)) - result.write(5, msg) - result.finish() - -proc requestDHTProvide(cid: Cid, timeout = 0): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go - ## Processing function `doDHTProvide(req *pb.DHTRequest)`. - let msgid = safeConvert[uint](DHTRequestType.PROVIDE) - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, msgid) - msg.write(3, cid.data.buffer) - if timeout > 0: - msg.write(7, hint64(timeout)) - msg.finish() - result.write(1, safeConvert[uint](RequestType.DHT)) - result.write(5, msg) - result.finish() - -proc requestCMTagPeer(peer: PeerId, tag: string, weight: int): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/connmgr.go#L18 - let msgid = safeConvert[uint](ConnManagerRequestType.TAG_PEER) - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, msgid) - msg.write(2, peer) - msg.write(3, tag) - msg.write(4, hint64(weight)) - msg.finish() - result.write(1, safeConvert[uint](RequestType.CONNMANAGER)) - result.write(6, msg) - result.finish() - -proc requestCMUntagPeer(peer: PeerId, tag: string): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/connmgr.go#L33 - let msgid = safeConvert[uint](ConnManagerRequestType.UNTAG_PEER) - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, msgid) - msg.write(2, peer) - msg.write(3, tag) - msg.finish() - result.write(1, safeConvert[uint](RequestType.CONNMANAGER)) - result.write(6, msg) - result.finish() - -proc requestCMTrim(): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/connmgr.go#L47 - let msgid = safeConvert[uint](ConnManagerRequestType.TRIM) - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, msgid) - msg.finish() - result.write(1, safeConvert[uint](RequestType.CONNMANAGER)) - result.write(6, msg) - result.finish() - -proc requestPSGetTopics(): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/pubsub.go - ## Processing function `doPubsubGetTopics(req *pb.PSRequest)`. - let msgid = safeConvert[uint](PSRequestType.GET_TOPICS) - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, msgid) - msg.finish() - result.write(1, safeConvert[uint](RequestType.PUBSUB)) - result.write(8, msg) - result.finish() - -proc requestPSListPeers(topic: string): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/pubsub.go - ## Processing function `doPubsubListPeers(req *pb.PSRequest)`. - let msgid = safeConvert[uint](PSRequestType.LIST_PEERS) - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, msgid) - msg.write(2, topic) - msg.finish() - result.write(1, safeConvert[uint](RequestType.PUBSUB)) - result.write(8, msg) - result.finish() - -proc requestPSPublish(topic: string, data: openArray[byte]): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/pubsub.go - ## Processing function `doPubsubPublish(req *pb.PSRequest)`. - let msgid = safeConvert[uint](PSRequestType.PUBLISH) - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, msgid) - msg.write(2, topic) - msg.write(3, data) - msg.finish() - result.write(1, safeConvert[uint](RequestType.PUBSUB)) - result.write(8, msg) - result.finish() - -proc requestPSSubscribe(topic: string): ProtoBuffer = - ## https://github.com/libp2p/go-libp2p-daemon/blob/master/pubsub.go - ## Processing function `doPubsubSubscribe(req *pb.PSRequest)`. - let msgid = safeConvert[uint](PSRequestType.SUBSCRIBE) - result = initProtoBuffer({WithVarintLength}) - var msg = initProtoBuffer() - msg.write(1, msgid) - msg.write(2, topic) - msg.finish() - result.write(1, safeConvert[uint](RequestType.PUBSUB)) - result.write(8, msg) - result.finish() - -proc checkResponse(pb: ProtoBuffer): ResponseKind {.inline.} = - result = ResponseKind.Malformed - var value: uint64 - if getRequiredField(pb, 1, value).isOk(): - if value == 0: - result = ResponseKind.Success - else: - result = ResponseKind.Error - -proc getErrorMessage(pb: ProtoBuffer): string {.inline, raises: [DaemonLocalError].} = - var error: seq[byte] - if pb.getRequiredField(ResponseType.ERROR.int, error).isOk(): - if initProtoBuffer(error).getRequiredField(1, result).isErr(): - raise newException(DaemonLocalError, "Error message is missing!") - -proc recvMessage( - conn: StreamTransport -): Future[seq[byte]] {. - async: (raises: [TransportIncompleteError, TransportError, CancelledError]) -.} = - var - size: uint - length: int - res: VarintResult[void] - var buffer = newSeqUninit[byte](10) - try: - for i in 0 ..< len(buffer): - await conn.readExactly(addr buffer[i], 1) - res = PB.getUVarint(buffer.toOpenArray(0, i), length, size) - if res.isOk(): - break - if res.isErr() or size > 1'u shl 22: - buffer.setLen(0) - result = buffer - return - buffer.setLen(size) - await conn.readExactly(addr buffer[0], int(size)) - except TransportIncompleteError: - buffer.setLen(0) - - result = buffer - -proc newConnection*( - api: DaemonAPI -): Future[StreamTransport] {. - async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError]) -.} = - await connect(api.address) - -proc closeConnection*( - api: DaemonAPI, transp: StreamTransport -): Future[void] {.async: (raises: [CancelledError]).} = - await transp.closeWait() - -proc socketExists(address: MultiAddress): Future[bool] {.async: (raises: []).} = - try: - var transp = await connect(address) - await transp.closeWait() - result = true - except CatchableError: - result = false - -when defined(windows): - proc getCurrentProcessId(): uint32 {. - stdcall, dynlib: "kernel32", importc: "GetCurrentProcessId" - .} - - proc getProcessId(): int = - result = cast[int](getCurrentProcessId()) - -else: - proc getProcessId(): int = - result = int(posix.getpid()) - -proc getSocket( - pattern: string, count: ptr int -): Future[MultiAddress] {.async: (raises: [ValueError, LPError]).} = - var sockname = "" - var pid = $getProcessId() - sockname = pattern % [pid, $(count[])] - let tmpma = MultiAddress.init(sockname).tryGet() - - if UNIX.match(tmpma): - while true: - count[] = count[] + 1 - sockname = pattern % [pid, $(count[])] - var ma = MultiAddress.init(sockname).tryGet() - let res = await socketExists(ma) - if not res: - result = ma - break - elif TCP.match(tmpma): - sockname = pattern % [pid, "0"] - var ma = MultiAddress.init(sockname).tryGet() - var sock = createAsyncSocket(ma) - if sock.bindAsyncSocket(ma): - # Socket was successfully bound, then its free to use - count[] = count[] + 1 - var ta = sock.getLocalAddress() - sockname = pattern % [pid, $ta.port] - result = MultiAddress.init(sockname).tryGet() - closeSocket(sock) - -# This is forward declaration needed for newDaemonApi() -proc listPeers*( - api: DaemonAPI -): Future[seq[PeerInfo]] {. - async: ( - raises: [ - ValueError, DaemonLocalError, OSError, MaInvalidAddress, TransportError, - CancelledError, LPError, - ] - ) -.} - -template exceptionToAssert(body: untyped): untyped = - block: - var res: type(body) - when defined(nimHasWarnBareExcept): - {.push warning[BareExcept]: off.} - try: - res = body - except OSError as exc: - raise newException(OSError, "failure in exceptionToAssert: " & exc.msg, exc) - except IOError as exc: - raise newException(IOError, "failure in exceptionToAssert: " & exc.msg, exc) - except Defect as exc: - raise newException(Defect, "failure in exceptionToAssert: " & exc.msg, exc) - except Exception as exc: - raiseAssert "Exception captured in exceptionToAssert: " & exc.msg - when defined(nimHasWarnBareExcept): - {.pop.} - res - -proc copyEnv(): StringTableRef = - ## This procedure copy all environment variables into StringTable. - result = newStringTable(modeStyleInsensitive) - for key, val in envPairs(): - result[key] = val - -proc newDaemonApi*( - flags: set[P2PDaemonFlags] = {}, - bootstrapNodes: seq[string] = @[], - id: string = "", - hostAddresses: seq[MultiAddress] = @[], - announcedAddresses: seq[MultiAddress] = @[], - daemon = DefaultDaemonFile, - sockpath = "", - patternSock = "", - patternHandler = "", - poolSize = 10, - gossipsubHeartbeatInterval = 0, - gossipsubHeartbeatDelay = 0, - peersRequired = 2, - logFile = "", - logLevel = IpfsLogLevel.Debug, -): Future[DaemonAPI] {. - async: ( - raises: [ - ValueError, DaemonLocalError, CancelledError, LPError, OSError, IOError, - AsyncError, - ] - ) -.} = - ## Initialize connection to `go-libp2p-daemon` control socket. - ## - ## ``flags`` - set of P2PDaemonFlags. - ## - ## ``bootstrapNodes`` - list of bootnode's addresses in MultiAddress format. - ## (default: @[], which means usage of default nodes inside of - ## `go-libp2p-daemon`). - ## - ## ``id`` - path to file with identification information (default: "" which - ## means - generate new random identity). - ## - ## ``hostAddresses`` - list of multiaddrs the host should listen on. - ## (default: @[], the daemon will pick a listening port at random). - ## - ## ``announcedAddresses`` - list of multiaddrs the host should announce to - ## the network (default: @[], the daemon will announce its own listening - ## address). - ## - ## ``daemon`` - name of ``go-libp2p-daemon`` executable (default: "p2pd"). - ## - ## ``sockpath`` - default control socket MultiAddress - ## (default: "/unix/tmp/p2pd.sock"). - ## - ## ``patternSock`` - MultiAddress pattern string, used to start multiple - ## daemons (default on Unix: "/unix/tmp/nim-p2pd-$1-$2.sock", on Windows: - ## "/ip4/127.0.0.1/tcp/$2"). - ## - ## ``patternHandler`` - MultiAddress pattern string, used to establish - ## incoming channels (default on Unix: "/unix/tmp/nim-p2pd-handle-$1-$2.sock", - ## on Windows: "/ip4/127.0.0.1/tcp/$2"). - ## - ## ``poolSize`` - size of connections pool (default: 10). - ## - ## ``gossipsubHeartbeatInterval`` - GossipSub protocol heartbeat interval in - ## milliseconds (default: 0, use default `go-libp2p-daemon` values). - ## - ## ``gossipsubHeartbeatDelay`` - GossipSub protocol heartbeat delay in - ## millseconds (default: 0, use default `go-libp2p-daemon` values). - ## - ## ``peersRequired`` - Wait until `go-libp2p-daemon` will connect to at least - ## ``peersRequired`` peers before return from `newDaemonApi()` procedure - ## (default: 2). - ## - ## ``logFile`` - Enable ``go-libp2p-daemon`` logging and store it to file - ## ``logFile`` (default: "", no logging) - ## - ## ``logLevel`` - Set ``go-libp2p-daemon`` logging verbosity level to - ## ``logLevel`` (default: Debug) - var api = new DaemonAPI - var args = newSeq[string]() - var env: StringTableRef - - when defined(windows): - var patternForSocket = - if len(patternSock) > 0: patternSock else: DefaultIpSocketPattern - var patternForChild = - if len(patternHandler) > 0: patternHandler else: DefaultIpChildPattern - else: - var patternForSocket = - if len(patternSock) > 0: patternSock else: DefaultUnixSocketPattern - var patternForChild = - if len(patternHandler) > 0: patternHandler else: DefaultUnixChildPattern - - api.flags = flags - api.servers = newSeq[P2PServer]() - api.pattern = patternForChild - api.ucounter = 1 - - if len(sockpath) == 0: - api.flags.excl(NoProcessCtrl) - api.address = await getSocket(patternForSocket, addr daemonsCount) - else: - api.address = MultiAddress.init(sockpath).tryGet() - api.flags.incl(NoProcessCtrl) - let res = await socketExists(api.address) - if not res: - raise newException(DaemonLocalError, "Could not connect to remote daemon") - result = api - return - - # DHTFull and DHTClient could not be present at the same time - if DHTFull in flags and DHTClient in flags: - api.flags.excl(DHTClient) - # PSGossipSub and PSFloodSub could not be present at the same time - if PSGossipSub in flags and PSFloodSub in flags: - api.flags.excl(PSFloodSub) - if DHTFull in api.flags: - args.add("-dht") - if DHTClient in api.flags: - args.add("-dhtClient") - if {Bootstrap, WaitBootstrap} * api.flags != {}: - args.add("-b") - if len(logFile) != 0: - env = copyEnv() - env["IPFS_LOGGING_FMT"] = "nocolor" - env["GOLOG_FILE"] = logFile - case logLevel - of IpfsLogLevel.Critical: - env["IPFS_LOGGING"] = "CRITICAL" - of IpfsLogLevel.Error: - env["IPFS_LOGGING"] = "ERROR" - of IpfsLogLevel.Warning: - env["IPFS_LOGGING"] = "WARNING" - of IpfsLogLevel.Notice: - env["IPFS_LOGGING"] = "NOTICE" - of IpfsLogLevel.Info: - env["IPFS_LOGGING"] = "INFO" - of IpfsLogLevel.Debug: - env["IPFS_LOGGING"] = "DEBUG" - of IpfsLogLevel.Trace: - env["IPFS_LOGGING"] = "DEBUG" - env["GOLOG_TRACING_FILE"] = logFile - if PSGossipSub in api.flags: - args.add("-pubsub") - args.add("-pubsubRouter=gossipsub") - if gossipsubHeartbeatInterval != 0: - let param = $gossipsubHeartbeatInterval & "ms" - args.add("-gossipsubHeartbeatInterval=" & param) - if gossipsubHeartbeatDelay != 0: - let param = $gossipsubHeartbeatDelay & "ms" - args.add("-gossipsubHeartbeatInitialDelay=" & param) - if PSFloodSub in api.flags: - args.add("-pubsub") - args.add("-pubsubRouter=floodsub") - if api.flags * {PSFloodSub, PSGossipSub} != {}: - if PSNoSign in api.flags: - args.add("-pubsubSign=false") - if PSStrictSign in api.flags: - args.add("-pubsubSignStrict=true") - if NATPortMap in api.flags: - args.add("-natPortMap=true") - if AutoNAT in api.flags: - args.add("-autonat=true") - if AutoRelay in api.flags: - args.add("-autoRelay=true") - if RelayActive in api.flags: - args.add("-relayActive=true") - if RelayDiscovery in api.flags: - args.add("-relayDiscovery=true") - if RelayHop in api.flags: - args.add("-relayHop=true") - if NoInlinePeerId in api.flags: - args.add("-noInlinePeerId=true") - if len(bootstrapNodes) > 0: - args.add("-bootstrapPeers=" & bootstrapNodes.join(",")) - if len(id) != 0: - args.add("-id=" & id) - if len(hostAddresses) > 0: - var opt = "-hostAddrs=" - for i, address in hostAddresses: - if i > 0: - opt.add "," - opt.add $address - args.add(opt) - if len(announcedAddresses) > 0: - var opt = "-announceAddrs=" - for i, address in announcedAddresses: - if i > 0: - opt.add "," - opt.add $address - args.add(opt) - args.add("-noise=true") - args.add("-quic=false") - args.add("-listen=" & $api.address) - - # We are trying to get absolute daemon path. - let cmd = findExe(daemon) - trace "p2pd cmd", cmd, args - if len(cmd) == 0: - raise newException(DaemonLocalError, "Could not find daemon executable!") - - # Starting daemon process - # echo "Starting ", cmd, " ", args.join(" ") - api.process = exceptionToAssert: - startProcess(cmd, "", args, env, {poParentStreams}) - # Waiting until daemon will not be bound to control socket. - while true: - if not api.process.running(): - raise newException(DaemonLocalError, "Daemon executable could not be started!") - let res = await socketExists(api.address) - if res: - break - await sleepAsync(500.milliseconds) - - if WaitBootstrap in api.flags: - while true: - var peers = await listPeers(api) - if len(peers) >= peersRequired: - break - await sleepAsync(1.seconds) - - result = api - -proc close*(stream: P2PStream) {.async: (raises: [DaemonLocalError]).} = - ## Close ``stream``. - if P2PStreamFlags.Closed notin stream.flags: - await stream.transp.closeWait() - stream.transp = nil - stream.flags.incl(P2PStreamFlags.Closed) - else: - raise newException(DaemonLocalError, "Stream is already closed!") - -proc close*( - api: DaemonAPI -) {.async: (raises: [TransportOsError, LPError, ValueError, OSError, CancelledError]).} = - ## Shutdown connections to `go-libp2p-daemon` control socket. - # await api.pool.close() - # Closing all pending servers. - if len(api.servers) > 0: - var pending = newSeq[Future[void]]() - for server in api.servers: - server.server.stop() - server.server.close() - pending.add(server.server.join()) - await allFutures(pending) - for server in api.servers: - let address = initTAddress(server.address).tryGet() - discard tryRemoveFile($address) - api.servers.setLen(0) - # Closing daemon's process. - if NoProcessCtrl notin api.flags: - when defined(windows): - api.process.kill() - else: - api.process.terminate() - discard api.process.waitForExit() - # Attempt to delete unix socket endpoint. - let address = initTAddress(api.address).tryGet() - if address.family == AddressFamily.Unix: - discard tryRemoveFile($address) - -template withMessage(m, body: untyped): untyped = - let kind = m.checkResponse() - if kind == ResponseKind.Error: - raise newException(DaemonRemoteError, m.getErrorMessage()) - elif kind == ResponseKind.Malformed: - raise newException(DaemonLocalError, "Malformed message received!") - else: - body - -proc transactMessage( - transp: StreamTransport, pb: ProtoBuffer -): Future[ProtoBuffer] {. - async: (raises: [DaemonLocalError, TransportError, CancelledError]) -.} = - let length = pb.getLen() - let res = await transp.write(pb.getPtr(), length) - if res != length: - raise newException(DaemonLocalError, "Could not send message to daemon!") - var message = await transp.recvMessage() - if len(message) == 0: - raise newException(DaemonLocalError, "Incorrect or empty message received!") - result = initProtoBuffer(message) - -proc getPeerInfo(pb: ProtoBuffer): PeerInfo {.raises: [DaemonLocalError].} = - ## Get PeerInfo object from ``pb``. - result.addresses = newSeq[MultiAddress]() - if pb.getRequiredField(1, result.peer).isErr(): - raise newException(DaemonLocalError, "Incorrect or empty message received!") - - discard pb.getRepeatedField(2, result.addresses) - -proc identity*( - api: DaemonAPI -): Future[PeerInfo] {. - async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError]) -.} = - ## Get Node identity information - var transp = await api.newConnection() - try: - var pb = await transactMessage(transp, requestIdentity()) - pb.withMessage: - var res: seq[byte] - if pb.getRequiredField(ResponseType.IDENTITY.int, res).isOk(): - var resPb = initProtoBuffer(res) - result = getPeerInfo(resPb) - finally: - await api.closeConnection(transp) - -proc connect*( - api: DaemonAPI, peer: PeerId, addresses: seq[MultiAddress], timeout = 0 -) {.async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError]).} = - ## Connect to remote peer with id ``peer`` and addresses ``addresses``. - var transp = await api.newConnection() - try: - var pb = await transp.transactMessage(requestConnect(peer, addresses, timeout)) - pb.withMessage: - discard - except CatchableError: - await api.closeConnection(transp) - -proc disconnect*( - api: DaemonAPI, peer: PeerId -) {.async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError]).} = - ## Disconnect from remote peer with id ``peer``. - var transp = await api.newConnection() - try: - var pb = await transp.transactMessage(requestDisconnect(peer)) - pb.withMessage: - discard - finally: - await api.closeConnection(transp) - -proc openStream*( - api: DaemonAPI, peer: PeerId, protocols: seq[string], timeout = 0 -): Future[P2PStream] {. - async: ( - raises: - [MaInvalidAddress, TransportError, CancelledError, LPError, DaemonLocalError] - ) -.} = - ## Open new stream to peer ``peer`` using one of the protocols in - ## ``protocols``. Returns ``StreamTransport`` for the stream. - var transp = await api.newConnection() - var stream = new P2PStream - try: - var pb = await transp.transactMessage(requestStreamOpen(peer, protocols, timeout)) - pb.withMessage: - var res: seq[byte] - if pb.getRequiredField(ResponseType.STREAMINFO.int, res).isOk(): - let resPb = initProtoBuffer(res) - var raddress = newSeqUninit[byte](0) - stream.protocol = "" - resPb.getRequiredField(1, stream.peer).tryGet() - resPb.getRequiredField(2, raddress).tryGet() - stream.raddress = MultiAddress.init(raddress).tryGet() - resPb.getRequiredField(3, stream.protocol).tryGet() - stream.flags.incl(Outbound) - stream.transp = transp - result = stream - except ResultError[ProtoError] as e: - await api.closeConnection(transp) - raise newException(DaemonLocalError, "Wrong message type: " & e.msg, e) - -proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} = - # must not specify raised exceptions as this is StreamCallback from chronos - var api = getUserData[DaemonAPI](server) - var message = await transp.recvMessage() - var pb = initProtoBuffer(message) - var stream = new P2PStream - var raddress = newSeqUninit[byte](0) - stream.protocol = "" - pb.getRequiredField(1, stream.peer).tryGet() - pb.getRequiredField(2, raddress).tryGet() - stream.raddress = MultiAddress.init(raddress).tryGet() - pb.getRequiredField(3, stream.protocol).tryGet() - stream.flags.incl(Inbound) - stream.transp = transp - if len(stream.protocol) > 0: - var handler = api.handlers.getOrDefault(stream.protocol) - if not isNil(handler): - asyncSpawn handler(api, stream) - -proc addHandler*( - api: DaemonAPI, protocols: seq[string], handler: P2PStreamCallback -) {. - async: ( - raises: [ - MaInvalidAddress, DaemonLocalError, TransportError, CancelledError, LPError, - ValueError, - ] - ) -.} = - ## Add stream handler ``handler`` for set of protocols ``protocols``. - var transp = await api.newConnection() - let maddress = await getSocket(api.pattern, addr api.ucounter) - var server = createStreamServer(maddress, streamHandler, udata = api) - - var removeHandler = proc(): Future[void] {. - async: (raises: [CancelledError, TransportError]) - .} = - for item in protocols: - api.handlers.del(item) - server.stop() - server.close() - await server.join() - - try: - for item in protocols: - api.handlers[item] = handler - server.start() - var pb = await transp.transactMessage(requestStreamHandler(maddress, protocols)) - pb.withMessage: - api.servers.add(P2PServer(server: server, address: maddress)) - except DaemonLocalError as e: - await removeHandler() - raise newException(DaemonLocalError, "Could not add stream handler: " & e.msg, e) - except TransportError as e: - await removeHandler() - raise newException(TransportError, "Could not add stream handler: " & e.msg, e) - except CancelledError as e: - await removeHandler() - raise e - finally: - await api.closeConnection(transp) - -proc listPeers*( - api: DaemonAPI -): Future[seq[PeerInfo]] {. - async: ( - raises: [ - ValueError, DaemonLocalError, OSError, MaInvalidAddress, TransportError, - CancelledError, LPError, - ] - ) -.} = - ## Get list of remote peers to which we are currently connected. - var transp = await api.newConnection() - try: - var pb = await transp.transactMessage(requestListPeers()) - pb.withMessage: - result = newSeq[PeerInfo]() - var ress: seq[seq[byte]] - if pb.getRequiredRepeatedField(ResponseType.PEERINFO.int, ress).isOk(): - for p in ress: - let peer = initProtoBuffer(p).getPeerInfo() - result.add(peer) - finally: - await api.closeConnection(transp) - -proc cmTagPeer*( - api: DaemonAPI, peer: PeerId, tag: string, weight: int -) {. - async: ( - raises: - [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] - ) -.} = - ## Tag peer with id ``peer`` using ``tag`` and ``weight``. - var transp = await api.newConnection() - try: - var pb = await transp.transactMessage(requestCMTagPeer(peer, tag, weight)) - withMessage(pb): - discard - finally: - await api.closeConnection(transp) - -proc cmUntagPeer*( - api: DaemonAPI, peer: PeerId, tag: string -) {. - async: ( - raises: - [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] - ) -.} = - ## Remove tag ``tag`` from peer with id ``peer``. - var transp = await api.newConnection() - try: - var pb = await transp.transactMessage(requestCMUntagPeer(peer, tag)) - withMessage(pb): - discard - finally: - await api.closeConnection(transp) - -proc cmTrimPeers*( - api: DaemonAPI -) {. - async: ( - raises: - [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] - ) -.} = - ## Trim all connections. - var transp = await api.newConnection() - try: - var pb = await transp.transactMessage(requestCMTrim()) - withMessage(pb): - discard - finally: - await api.closeConnection(transp) - -proc dhtGetSinglePeerInfo(pb: ProtoBuffer): PeerInfo {.raises: [DaemonLocalError].} = - var res: seq[byte] - if pb.getRequiredField(2, res).isOk(): - result = initProtoBuffer(res).getPeerInfo() - else: - raise newException(DaemonLocalError, "Missing required field `peer`!") - -proc dhtGetSingleValue(pb: ProtoBuffer): seq[byte] {.raises: [DaemonLocalError].} = - result = newSeqUninit[byte](0) - if pb.getRequiredField(3, result).isErr(): - raise newException(DaemonLocalError, "Missing field `value`!") - -proc dhtGetSinglePublicKey(pb: ProtoBuffer): PublicKey {.raises: [DaemonLocalError].} = - if pb.getRequiredField(3, result).isErr(): - raise newException(DaemonLocalError, "Missing field `value`!") - -proc dhtGetSinglePeerId(pb: ProtoBuffer): PeerId {.raises: [DaemonLocalError].} = - if pb.getRequiredField(3, result).isErr(): - raise newException(DaemonLocalError, "Missing field `value`!") - -proc enterDhtMessage( - pb: ProtoBuffer, rt: DHTResponseType -): ProtoBuffer {.inline, raises: [DaemonLocalError].} = - var dhtResponse: seq[byte] - if pb.getRequiredField(ResponseType.DHT.int, dhtResponse).isOk(): - var pbDhtResponse = initProtoBuffer(dhtResponse) - var dtype: uint - if pbDhtResponse.getRequiredField(1, dtype).isErr(): - raise newException(DaemonLocalError, "Missing required DHT field `type`!") - if dtype != safeConvert[uint](rt): - raise newException(DaemonLocalError, "Wrong DHT answer type! ") - - var value: seq[byte] - if pbDhtResponse.getRequiredField(3, value).isErr(): - raise newException(DaemonLocalError, "Missing required DHT field `value`!") - - return initProtoBuffer(value) - else: - raise newException(DaemonLocalError, "Wrong message type!") - -proc enterPsMessage( - pb: ProtoBuffer -): ProtoBuffer {.inline, raises: [DaemonLocalError].} = - var res: seq[byte] - if pb.getRequiredField(ResponseType.PUBSUB.int, res).isErr(): - raise newException(DaemonLocalError, "Wrong message type!") - - initProtoBuffer(res) - -proc getDhtMessageType( - pb: ProtoBuffer -): DHTResponseType {.inline, raises: [DaemonLocalError].} = - var dtype: uint - if pb.getRequiredField(1, dtype).isErr(): - raise newException(DaemonLocalError, "Missing required DHT field `type`!") - if dtype == safeConvert[uint](DHTResponseType.VALUE): - result = DHTResponseType.VALUE - elif dtype == safeConvert[uint](DHTResponseType.END): - result = DHTResponseType.END - else: - raise newException(DaemonLocalError, "Wrong DHT answer type!") - -proc dhtFindPeer*( - api: DaemonAPI, peer: PeerId, timeout = 0 -): Future[PeerInfo] {. - async: ( - raises: - [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] - ) -.} = - ## Find peer with id ``peer`` and return peer information ``PeerInfo``. - ## - ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value - ## means no timeout. - var transp = await api.newConnection() - try: - var pb = await transp.transactMessage(requestDHTFindPeer(peer, timeout)) - withMessage(pb): - result = pb.enterDhtMessage(DHTResponseType.VALUE).dhtGetSinglePeerInfo() - finally: - await api.closeConnection(transp) - -proc dhtGetPublicKey*( - api: DaemonAPI, peer: PeerId, timeout = 0 -): Future[PublicKey] {. - async: ( - raises: - [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] - ) -.} = - ## Get peer's public key from peer with id ``peer``. - ## - ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value - ## means no timeout. - var transp = await api.newConnection() - try: - var pb = await transp.transactMessage(requestDHTGetPublicKey(peer, timeout)) - withMessage(pb): - result = pb.enterDhtMessage(DHTResponseType.VALUE).dhtGetSinglePublicKey() - finally: - await api.closeConnection(transp) - -proc dhtGetValue*( - api: DaemonAPI, key: string, timeout = 0 -): Future[seq[byte]] {. - async: ( - raises: - [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] - ) -.} = - ## Get value associated with ``key``. - ## - ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value - ## means no timeout. - var transp = await api.newConnection() - try: - var pb = await transp.transactMessage(requestDHTGetValue(key, timeout)) - withMessage(pb): - result = pb.enterDhtMessage(DHTResponseType.VALUE).dhtGetSingleValue() - finally: - await api.closeConnection(transp) - -proc dhtPutValue*( - api: DaemonAPI, key: string, value: seq[byte], timeout = 0 -) {. - async: ( - raises: - [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] - ) -.} = - ## Associate ``value`` with ``key``. - ## - ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value - ## means no timeout. - var transp = await api.newConnection() - try: - var pb = await transp.transactMessage(requestDHTPutValue(key, value, timeout)) - withMessage(pb): - discard - finally: - await api.closeConnection(transp) - -proc dhtProvide*( - api: DaemonAPI, cid: Cid, timeout = 0 -) {. - async: ( - raises: - [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] - ) -.} = - ## Provide content with id ``cid``. - ## - ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value - ## means no timeout. - var transp = await api.newConnection() - try: - var pb = await transp.transactMessage(requestDHTProvide(cid, timeout)) - withMessage(pb): - discard - finally: - await api.closeConnection(transp) - -proc dhtFindPeersConnectedToPeer*( - api: DaemonAPI, peer: PeerId, timeout = 0 -): Future[seq[PeerInfo]] {. - async: ( - raises: - [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] - ) -.} = - ## Find peers which are connected to peer with id ``peer``. - ## - ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value - ## means no timeout. - var transp = await api.newConnection() - var list = newSeq[PeerInfo]() - try: - let spb = requestDHTFindPeersConnectedToPeer(peer, timeout) - var pb = await transp.transactMessage(spb) - withMessage(pb): - discard pb.enterDhtMessage(DHTResponseType.BEGIN) - while true: - var message = await transp.recvMessage() - if len(message) == 0: - break - var cpb = initProtoBuffer(message) - if cpb.getDhtMessageType() == DHTResponseType.END: - break - list.add(cpb.dhtGetSinglePeerInfo()) - result = list - finally: - await api.closeConnection(transp) - -proc dhtGetClosestPeers*( - api: DaemonAPI, key: string, timeout = 0 -): Future[seq[PeerId]] {. - async: ( - raises: - [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] - ) -.} = - ## Get closest peers for ``key``. - ## - ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value - ## means no timeout. - var transp = await api.newConnection() - var list = newSeq[PeerId]() - try: - let spb = requestDHTGetClosestPeers(key, timeout) - var pb = await transp.transactMessage(spb) - withMessage(pb): - discard pb.enterDhtMessage(DHTResponseType.BEGIN) - while true: - var message = await transp.recvMessage() - if len(message) == 0: - break - var cpb = initProtoBuffer(message) - if cpb.getDhtMessageType() == DHTResponseType.END: - break - list.add(cpb.dhtGetSinglePeerId()) - result = list - finally: - await api.closeConnection(transp) - -proc dhtFindProviders*( - api: DaemonAPI, cid: Cid, count: uint32, timeout = 0 -): Future[seq[PeerInfo]] {. - async: ( - raises: - [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] - ) -.} = - ## Get ``count`` providers for content with id ``cid``. - ## - ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value - ## means no timeout. - var transp = await api.newConnection() - var list = newSeq[PeerInfo]() - try: - let spb = requestDHTFindProviders(cid, count, timeout) - var pb = await transp.transactMessage(spb) - withMessage(pb): - discard pb.enterDhtMessage(DHTResponseType.BEGIN) - while true: - var message = await transp.recvMessage() - if len(message) == 0: - break - var cpb = initProtoBuffer(message) - if cpb.getDhtMessageType() == DHTResponseType.END: - break - list.add(cpb.dhtGetSinglePeerInfo()) - result = list - finally: - await api.closeConnection(transp) - -proc dhtSearchValue*( - api: DaemonAPI, key: string, timeout = 0 -): Future[seq[seq[byte]]] {. - async: ( - raises: - [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] - ) -.} = - ## Search for value with ``key``, return list of values found. - ## - ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value - ## means no timeout. - var transp = await api.newConnection() - var list = newSeq[seq[byte]]() - try: - var pb = await transp.transactMessage(requestDHTSearchValue(key, timeout)) - withMessage(pb): - discard pb.enterDhtMessage(DHTResponseType.BEGIN) - while true: - var message = await transp.recvMessage() - if len(message) == 0: - break - var cpb = initProtoBuffer(message) - if cpb.getDhtMessageType() == DHTResponseType.END: - break - list.add(cpb.dhtGetSingleValue()) - result = list - finally: - await api.closeConnection(transp) - -proc pubsubGetTopics*( - api: DaemonAPI -): Future[seq[string]] {. - async: ( - raises: - [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] - ) -.} = - ## Get list of topics this node is subscribed to. - var transp = await api.newConnection() - try: - var pb = await transp.transactMessage(requestPSGetTopics()) - withMessage(pb): - let innerPb = pb.enterPsMessage() - var topics = newSeq[string]() - discard innerPb.getRepeatedField(1, topics) - result = topics - finally: - await api.closeConnection(transp) - -proc pubsubListPeers*( - api: DaemonAPI, topic: string -): Future[seq[PeerId]] {. - async: ( - raises: - [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] - ) -.} = - ## Get list of peers we are connected to and which also subscribed to topic - ## ``topic``. - var transp = await api.newConnection() - try: - var pb = await transp.transactMessage(requestPSListPeers(topic)) - withMessage(pb): - var peer: PeerId - let innerPb = pb.enterPsMessage() - var peers = newSeq[seq[byte]]() - discard innerPb.getRepeatedField(2, peers) - result = peers.mapIt(PeerId.init(it).get()) - finally: - await api.closeConnection(transp) - -proc pubsubPublish*( - api: DaemonAPI, topic: string, value: seq[byte] -) {. - async: ( - raises: - [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] - ) -.} = - ## Get list of peer identifiers which are subscribed to topic ``topic``. - var transp = await api.newConnection() - try: - var pb = await transp.transactMessage(requestPSPublish(topic, value)) - withMessage(pb): - discard - finally: - await api.closeConnection(transp) - -proc getPubsubMessage*(pb: ProtoBuffer): PubSubMessage = - result.data = newSeqUninit[byte](0) - result.seqno = newSeqUninit[byte](0) - discard pb.getField(1, result.peer) - discard pb.getField(2, result.data) - discard pb.getField(3, result.seqno) - discard pb.getRepeatedField(4, result.topics) - discard pb.getField(5, result.signature) - discard pb.getField(6, result.key) - -proc pubsubLoop( - api: DaemonAPI, ticket: PubsubTicket -) {. - async: ( - raises: [TransportIncompleteError, TransportError, CancelledError, CatchableError] - ) -.} = - while true: - var pbmessage = await ticket.transp.recvMessage() - if len(pbmessage) == 0: - break - var pb = initProtoBuffer(pbmessage) - var message = pb.getPubsubMessage() - ## We can do here `await` too - let res = await ticket.handler(api, ticket, message) - if not res: - ticket.transp.close() - await ticket.transp.join() - break - -proc pubsubSubscribe*( - api: DaemonAPI, topic: string, handler: P2PPubSubCallback2 -): Future[PubsubTicket] {. - async: ( - raises: - [MaInvalidAddress, TransportError, LPError, CancelledError, DaemonLocalError] - ) -.} = - ## Subscribe to topic ``topic``. - var transp = await api.newConnection() - try: - var pb = await transp.transactMessage(requestPSSubscribe(topic)) - pb.withMessage: - var ticket = new PubsubTicket - ticket.topic = topic - ticket.handler = handler - ticket.transp = transp - asyncSpawn pubsubLoop(api, ticket) - result = ticket - except DaemonLocalError as exc: - await api.closeConnection(transp) - raise newException( - DaemonLocalError, "Could not subscribe to topic '" & topic & "': " & exc.msg, exc - ) - except TransportError as exc: - await api.closeConnection(transp) - raise newException( - TransportError, "Could not subscribe to topic '" & topic & "': " & exc.msg, exc - ) - except CancelledError as exc: - await api.closeConnection(transp) - raise exc - -proc pubsubSubscribe*( - api: DaemonAPI, topic: string, handler: P2PPubSubCallback -): Future[PubsubTicket] {. - async: (raises: [CatchableError]), deprecated: "Use P2PPubSubCallback2 instead" -.} = - proc wrap( - api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage - ): Future[bool] {.async: (raises: [CatchableError]).} = - await handler(api, ticket, message) - - await pubsubSubscribe(api, topic, wrap) - -proc shortLog*(pinfo: PeerInfo): string = - ## Get string representation of ``PeerInfo`` object. - result = newStringOfCap(128) - result.add("{PeerId: '") - result.add($pinfo.peer.shortLog()) - result.add("' Addresses: [") - let length = len(pinfo.addresses) - for i in 0 ..< length: - result.add("'") - result.add($pinfo.addresses[i]) - result.add("'") - if i < length - 1: - result.add(", ") - result.add("]}") - if len(pinfo.addresses) > 0: - result = result diff --git a/libp2p/daemon/transpool.nim b/libp2p/daemon/transpool.nim deleted file mode 100644 index 52de06ffb..000000000 --- a/libp2p/daemon/transpool.nim +++ /dev/null @@ -1,156 +0,0 @@ -# Nim-Libp2p -# Copyright (c) 2023 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -# * MIT license ([LICENSE-MIT](LICENSE-MIT)) -# at your option. -# This file may not be copied, modified, or distributed except according to -# those terms. - -{.push raises: [].} - -## This module implements Pool of StreamTransport. -import chronos - -const DefaultPoolSize* = 8 ## Default pool size - -type - ConnectionFlags = enum - None - Busy - - PoolItem = object - transp*: StreamTransport - flags*: set[ConnectionFlags] - - PoolState = enum - Connecting - Connected - Closing - Closed - - TransportPool* = ref object ## Transports pool object - transports: seq[PoolItem] - busyCount: int - state: PoolState - bufferSize: int - event: AsyncEvent - - TransportPoolError* = object of AsyncError - -proc waitAll[T](futs: seq[Future[T]]): Future[void] = - ## Performs waiting for all Future[T]. - var counter = len(futs) - var retFuture = newFuture[void]("connpool.waitAllConnections") - proc cb(udata: pointer) = - dec(counter) - if counter == 0: - retFuture.complete() - - for fut in futs: - fut.addCallback(cb) - return retFuture - -proc newPool*( - address: TransportAddress, - poolsize: int = DefaultPoolSize, - bufferSize = DefaultStreamBufferSize, -): Future[TransportPool] {.async: (raises: [CancelledError]).} = - ## Establish pool of connections to address ``address`` with size - ## ``poolsize``. - var pool = new TransportPool - pool.bufferSize = bufferSize - pool.transports = newSeq[PoolItem](poolsize) - var conns = newSeq[Future[StreamTransport]](poolsize) - pool.state = Connecting - for i in 0 ..< poolsize: - conns[i] = connect(address, bufferSize) - # Waiting for all connections to be established. - await waitAll(conns) - # Checking connections and preparing pool. - for i in 0 ..< poolsize: - if conns[i].failed: - raise conns[i].error - else: - let transp = conns[i].read() - let item = PoolItem(transp: transp) - pool.transports[i] = item - # Setup available connections event - pool.event = newAsyncEvent() - pool.state = Connected - result = pool - -proc acquire*( - pool: TransportPool -): Future[StreamTransport] {.async: (raises: [CancelledError, TransportPoolError]).} = - ## Acquire non-busy connection from pool ``pool``. - var transp: StreamTransport - if pool.state in {Connected}: - while true: - if pool.busyCount < len(pool.transports): - for conn in pool.transports.mitems(): - if Busy notin conn.flags: - conn.flags.incl(Busy) - inc(pool.busyCount) - transp = conn.transp - break - else: - await pool.event.wait() - pool.event.clear() - - if not isNil(transp): - break - else: - raise newException(TransportPoolError, "Pool is not ready!") - result = transp - -proc release*( - pool: TransportPool, transp: StreamTransport -) {.async: (raises: [TransportPoolError]).} = - ## Release connection ``transp`` back to pool ``pool``. - if pool.state in {Connected, Closing}: - var found = false - for conn in pool.transports.mitems(): - if conn.transp == transp: - conn.flags.excl(Busy) - dec(pool.busyCount) - pool.event.fire() - found = true - break - if not found: - raise newException(TransportPoolError, "Transport not bound to pool!") - else: - raise newException(TransportPoolError, "Pool is not ready!") - -proc join*( - pool: TransportPool -) {.async: (raises: [TransportPoolError, CancelledError]).} = - ## Waiting for all connection to become available. - if pool.state in {Connected, Closing}: - while true: - if pool.busyCount == 0: - break - else: - await pool.event.wait() - pool.event.clear() - elif pool.state == Connecting: - raise newException(TransportPoolError, "Pool is not ready!") - -proc close*( - pool: TransportPool -) {.async: (raises: [TransportPoolError, CancelledError]).} = - ## Closes transports pool ``pool`` and release all resources. - if pool.state == Connected: - pool.state = Closing - # Waiting for all transports to become available. - await pool.join() - # Closing all transports - var pending = newSeq[Future[void]](len(pool.transports)) - for i in 0 ..< len(pool.transports): - let transp = pool.transports[i].transp - transp.close() - pending[i] = transp.join() - # Waiting for all transports to be closed - await waitAll(pending) - # Mark pool as `Closed`. - pool.state = Closed diff --git a/libp2p/wire.nim b/libp2p/wire.nim index 7f3a4f963..0bfa48744 100644 --- a/libp2p/wire.nim +++ b/libp2p/wire.nim @@ -150,57 +150,6 @@ proc createStreamServer*[T]( except CatchableError as exc: raise newException(LPError, "failed simpler createStreamServer: " & exc.msg, exc) -proc createAsyncSocket*(ma: MultiAddress): AsyncFD {.raises: [ValueError, LPError].} = - ## Create new asynchronous socket using MultiAddress' ``ma`` socket type and - ## protocol information. - ## - ## Returns ``asyncInvalidSocket`` on error. - ## - ## Note: This procedure only used in `go-libp2p-daemon` wrapper. - ## - - var - socktype: SockType = SockType.SOCK_STREAM - protocol: Protocol = Protocol.IPPROTO_TCP - - let address = initTAddress(ma).tryGet() - if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}: - if ma[1].tryGet().protoCode().tryGet() == multiCodec("udp"): - socktype = SockType.SOCK_DGRAM - protocol = Protocol.IPPROTO_UDP - elif ma[1].tryGet().protoCode().tryGet() == multiCodec("tcp"): - socktype = SockType.SOCK_STREAM - protocol = Protocol.IPPROTO_TCP - elif address.family in {AddressFamily.Unix}: - socktype = SockType.SOCK_STREAM - protocol = cast[Protocol](0) - else: - return asyncInvalidSocket - - try: - createAsyncSocket(address.getDomain(), socktype, protocol) - except CatchableError as exc: - raise newException( - LPError, "Convert exception to LPError in createAsyncSocket: " & exc.msg, exc - ) - -proc bindAsyncSocket*(sock: AsyncFD, ma: MultiAddress): bool {.raises: [LPError].} = - ## Bind socket ``sock`` to MultiAddress ``ma``. - ## - ## Note: This procedure only used in `go-libp2p-daemon` wrapper. - ## - - var - saddr: Sockaddr_storage - slen: SockLen - - let address = initTAddress(ma).tryGet() - toSAddr(address, saddr, slen) - if bindSocket(SocketHandle(sock), cast[ptr SockAddr](addr saddr), slen) == 0: - result = true - else: - result = false - proc getLocalAddress*(sock: AsyncFD): TransportAddress = ## Retrieve local socket ``sock`` address. ## diff --git a/scripts/build_p2pd.sh b/scripts/build_p2pd.sh deleted file mode 100644 index 1801a3897..000000000 --- a/scripts/build_p2pd.sh +++ /dev/null @@ -1,123 +0,0 @@ -#!/bin/bash - -# Copyright (c) 2018-2020 Status Research & Development GmbH. Licensed under -# either of: -# - Apache License, version 2.0 -# - MIT license -# at your option. This file may not be copied, modified, or distributed except -# according to those terms. - -set -e - -force=false -verbose=false -CACHE_DIR="" -LIBP2P_COMMIT="124530a3" - -while [[ "$#" -gt 0 ]]; do - case "$1" in - -f|--force) force=true ;; - -v|--verbose) verbose=true ;; - -h|--help) - echo "Usage: $0 [-f|--force] [-v|--verbose] [CACHE_DIR] [COMMIT]" - exit 0 - ;; - *) - # First non-option is CACHE_DIR, second is LIBP2P_COMMIT - if [[ -z "$CACHE_DIR" ]]; then - CACHE_DIR="$1" - elif [[ "$LIBP2P_COMMIT" == "124530a3" ]]; then - LIBP2P_COMMIT="$1" - else - echo "Unknown argument: $1" - exit 1 - fi - ;; - esac - shift -done - -SUBREPO_DIR="vendor/go/src/github.com/libp2p/go-libp2p-daemon" -if [[ ! -e "$SUBREPO_DIR" ]]; then - SUBREPO_DIR="go-libp2p-daemon" - rm -rf "$SUBREPO_DIR" - git clone -q https://github.com/libp2p/go-libp2p-daemon - cd "$SUBREPO_DIR" - git checkout -q "$LIBP2P_COMMIT" - cd .. -fi - -## env vars -[[ -z "$BUILD_MSG" ]] && BUILD_MSG="Building p2pd ${LIBP2P_COMMIT}" - -# Windows detection -if uname | grep -qiE "mingw|msys"; then - EXE_SUFFIX=".exe" - # otherwise it fails in AppVeyor due to https://github.com/git-for-windows/git/issues/2495 - GIT_TIMESTAMP_ARG="--date=unix" # available since Git 2.9.4 -else - EXE_SUFFIX="" - GIT_TIMESTAMP_ARG="--date=format-local:%s" # available since Git 2.7.0 -fi - -TARGET_DIR="$(go env GOPATH)/bin" -TARGET_BINARY="${TARGET_DIR}/p2pd${EXE_SUFFIX}" - -target_needs_rebuilding() { - REBUILD=0 - NO_REBUILD=1 - - if [[ -n "$CACHE_DIR" && -e "${CACHE_DIR}/p2pd${EXE_SUFFIX}" ]]; then - mkdir -p "${TARGET_DIR}" - cp -a "$CACHE_DIR"/* "${TARGET_DIR}/" - fi - - # compare the built commit's timestamp to the date of the last commit (keep in mind that Git doesn't preserve file timestamps) - if [[ -e "${TARGET_DIR}/timestamp" && $(cat "${TARGET_DIR}/timestamp") -eq $(cd "$SUBREPO_DIR"; git log --pretty=format:%cd -n 1 ${GIT_TIMESTAMP_ARG}) ]]; then - return $NO_REBUILD - else - return $REBUILD - fi -} - -build_target() { - echo -e "$BUILD_MSG" - - pushd "$SUBREPO_DIR" - # Go module downloads can fail randomly in CI VMs, so retry them a few times - MAX_RETRIES=5 - CURR=0 - while [[ $CURR -lt $MAX_RETRIES ]]; do - FAILED=0 - go get ./... && break || FAILED=1 - CURR=$(( CURR + 1 )) - if $verbose; then - echo "retry #${CURR}" - fi - done - if [[ $FAILED == 1 ]]; then - echo "Error: still fails after retrying ${MAX_RETRIES} times." - exit 1 - fi - go install ./... - - # record the last commit's timestamp - git log --pretty=format:%cd -n 1 ${GIT_TIMESTAMP_ARG} > "${TARGET_DIR}/timestamp" - - popd - - # update the CI cache - if [[ -n "$CACHE_DIR" ]]; then - rm -rf "$CACHE_DIR" - mkdir "$CACHE_DIR" - cp -a "$TARGET_DIR"/* "$CACHE_DIR"/ - fi - echo "Binary built successfully: $TARGET_BINARY" -} - -if $force || target_needs_rebuilding; then - build_target -else - echo "No rebuild needed." -fi - diff --git a/tests/commoninterop.nim b/tests/commoninterop.nim deleted file mode 100644 index 8906a1eec..000000000 --- a/tests/commoninterop.nim +++ /dev/null @@ -1,665 +0,0 @@ -import chronos, chronicles, stew/byteutils -import helpers -import ../libp2p -import ../libp2p/[daemon/daemonapi, varint, transports/wstransport, crypto/crypto] -import ../libp2p/protocols/connectivity/relay/[relay, client, utils] - -type - SwitchCreator = proc( - ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), - prov = proc(config: TransportConfig): Transport = - TcpTransport.new({}, config.upgr), - relay: Relay = Relay.new(circuitRelayV1 = true), - ): Switch {.gcsafe, raises: [LPError].} - DaemonPeerInfo = daemonapi.PeerInfo - -proc writeLp(s: StreamTransport, msg: string | seq[byte]): Future[int] {.gcsafe.} = - ## write lenght prefixed - var buf = initVBuffer() - buf.writeSeq(msg) - buf.finish() - result = s.write(buf.buffer) - -proc readLp(s: StreamTransport): Future[seq[byte]] {.async.} = - ## read length prefixed msg - var - size: uint - length: int - res: VarintResult[void] - result = newSeq[byte](10) - - for i in 0 ..< len(result): - await s.readExactly(addr result[i], 1) - res = LP.getUVarint(result.toOpenArray(0, i), length, size) - if res.isOk(): - break - res.expect("Valid varint") - result.setLen(size) - if size > 0.uint: - await s.readExactly(addr result[0], int(size)) - -proc testPubSubDaemonPublish( - gossip: bool = false, count: int = 1, swCreator: SwitchCreator -) {.async.} = - var pubsubData = "TEST MESSAGE" - var testTopic = "test-topic" - var msgData = pubsubData.toBytes() - - var flags = {PSFloodSub} - if gossip: - flags = {PSGossipSub} - - let daemonNode = await newDaemonApi(flags) - let daemonPeer = await daemonNode.identity() - let nativeNode = swCreator() - - let pubsub = - if gossip: - GossipSub.init(switch = nativeNode).PubSub - else: - FloodSub.init(switch = nativeNode).PubSub - - nativeNode.mount(pubsub) - - await nativeNode.start() - await pubsub.start() - let nativePeer = nativeNode.peerInfo - - var finished = false - var times = 0 - proc nativeHandler(topic: string, data: seq[byte]) {.async.} = - let smsg = string.fromBytes(data) - check smsg == pubsubData - times.inc() - if times >= count and not finished: - finished = true - - await nativeNode.connect(daemonPeer.peer, daemonPeer.addresses) - - await sleepAsync(500.millis) - await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - - proc pubsubHandler( - api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage - ): Future[bool] {.async: (raises: [CatchableError]).} = - result = true # don't cancel subscription - - asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler) - pubsub.subscribe(testTopic, nativeHandler) - await sleepAsync(3.seconds) - - proc publisher() {.async.} = - while not finished: - await daemonNode.pubsubPublish(testTopic, msgData) - await sleepAsync(250.millis) - - await wait(publisher(), 5.minutes) # should be plenty of time - - await nativeNode.stop() - await pubsub.stop() - await daemonNode.close() - -proc testPubSubNodePublish( - gossip: bool = false, count: int = 1, swCreator: SwitchCreator -) {.async.} = - var pubsubData = "TEST MESSAGE" - var testTopic = "test-topic" - var msgData = pubsubData.toBytes() - - var flags = {PSFloodSub} - if gossip: - flags = {PSGossipSub} - - let daemonNode = await newDaemonApi(flags) - let daemonPeer = await daemonNode.identity() - let nativeNode = swCreator() - - let pubsub = - if gossip: - GossipSub.init(switch = nativeNode).PubSub - else: - FloodSub.init(switch = nativeNode).PubSub - - nativeNode.mount(pubsub) - - await nativeNode.start() - await pubsub.start() - let nativePeer = nativeNode.peerInfo - - await nativeNode.connect(daemonPeer.peer, daemonPeer.addresses) - - await sleepAsync(500.millis) - await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - - var times = 0 - var finished = false - proc pubsubHandler( - api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage - ): Future[bool] {.async: (raises: [CatchableError]).} = - let smsg = string.fromBytes(message.data) - check smsg == pubsubData - times.inc() - if times >= count and not finished: - finished = true - result = true # don't cancel subscription - - discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler) - proc nativeHandler(topic: string, data: seq[byte]) {.async.} = - discard - - pubsub.subscribe(testTopic, nativeHandler) - await sleepAsync(3.seconds) - - proc publisher() {.async.} = - while not finished: - discard await pubsub.publish(testTopic, msgData) - await sleepAsync(250.millis) - - await wait(publisher(), 5.minutes) # should be plenty of time - - check finished - await nativeNode.stop() - await pubsub.stop() - await daemonNode.close() - -proc commonInteropTests*(name: string, swCreator: SwitchCreator) = - suite "Interop using " & name: - # TODO: chronos transports are leaking, - # but those are tracked for both the daemon - # and libp2p, so not sure which one it is, - # need to investigate more - # teardown: - # checkTrackers() - - # TODO: this test is failing sometimes on windows - # For some reason we receive EOF before test 4 sometimes - - asyncTest "native -> daemon multiple reads and writes": - var protos = @["/test-stream"] - - let nativeNode = swCreator() - - await nativeNode.start() - let daemonNode = await newDaemonApi() - let daemonPeer = await daemonNode.identity() - - var testFuture = newFuture[void]("test.future") - proc daemonHandler( - api: DaemonAPI, stream: P2PStream - ) {.async: (raises: [CatchableError]).} = - check string.fromBytes(await stream.transp.readLp()) == "test 1" - discard await stream.transp.writeLp("test 2") - check string.fromBytes(await stream.transp.readLp()) == "test 3" - discard await stream.transp.writeLp("test 4") - testFuture.complete() - - await daemonNode.addHandler(protos, daemonHandler) - let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0]) - await conn.writeLp("test 1") - check "test 2" == string.fromBytes((await conn.readLp(1024))) - - await conn.writeLp("test 3") - check "test 4" == string.fromBytes((await conn.readLp(1024))) - - await wait(testFuture, 10.secs) - - await nativeNode.stop() - await daemonNode.close() - - await sleepAsync(500.millis) - - asyncTest "native -> daemon connection": - var protos = @["/test-stream"] - var test = "TEST STRING" - # We are preparing expect string, which should be prefixed with varint - # length and do not have `\r\n` suffix, because we going to use - # readLine(). - var buffer = initVBuffer() - buffer.writeSeq(test & "\r\n") - buffer.finish() - var expect = newString(len(buffer) - 2) - copyMem(addr expect[0], addr buffer.buffer[0], len(expect)) - - let nativeNode = swCreator() - - await nativeNode.start() - - let daemonNode = await newDaemonApi() - let daemonPeer = await daemonNode.identity() - - var testFuture = newFuture[string]("test.future") - proc daemonHandler( - api: DaemonAPI, stream: P2PStream - ) {.async: (raises: [CatchableError]).} = - # We should perform `readLp()` instead of `readLine()`. `readLine()` - # here reads actually length prefixed string. - var line = await stream.transp.readLine() - check line == expect - testFuture.complete(line) - await stream.close() - - await daemonNode.addHandler(protos, daemonHandler) - let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0]) - await conn.writeLp(test & "\r\n") - check expect == (await wait(testFuture, 10.secs)) - - await conn.close() - await nativeNode.stop() - await daemonNode.close() - - asyncTest "daemon -> native connection": - var protos = @["/test-stream"] - var test = "TEST STRING" - - var testFuture = newFuture[string]("test.future") - proc nativeHandler( - conn: Connection, proto: string - ) {.async: (raises: [CancelledError]).} = - try: - var line = string.fromBytes(await conn.readLp(1024)) - check line == test - testFuture.complete(line) - except CancelledError as e: - raise e - except CatchableError: - check false # should not be here - finally: - await conn.close() - - # custom proto - var proto = new LPProtocol - proto.handler = nativeHandler - proto.codec = protos[0] # codec - - let nativeNode = swCreator() - - nativeNode.mount(proto) - - await nativeNode.start() - let nativePeer = nativeNode.peerInfo - - let daemonNode = await newDaemonApi() - await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - var stream = await daemonNode.openStream(nativePeer.peerId, protos) - discard await stream.transp.writeLp(test) - - check test == (await wait(testFuture, 10.secs)) - - await stream.close() - await nativeNode.stop() - await daemonNode.close() - await sleepAsync(500.millis) - - asyncTest "native -> daemon websocket connection": - var protos = @["/test-stream"] - var test = "TEST STRING" - - var testFuture = newFuture[string]("test.future") - proc nativeHandler( - conn: Connection, proto: string - ) {.async: (raises: [CancelledError]).} = - try: - var line = string.fromBytes(await conn.readLp(1024)) - check line == test - testFuture.complete(line) - except CancelledError as e: - raise e - except CatchableError: - check false # should not be here - finally: - await conn.close() - - # custom proto - var proto = new LPProtocol - proto.handler = nativeHandler - proto.codec = protos[0] # codec - - let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet() - - let nativeNode = swCreator( - ma = wsAddress, - prov = proc(config: TransportConfig): Transport = - WsTransport.new(config.upgr), - ) - - nativeNode.mount(proto) - - await nativeNode.start() - let nativePeer = nativeNode.peerInfo - - let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress]) - await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - var stream = await daemonNode.openStream(nativePeer.peerId, protos) - discard await stream.transp.writeLp(test) - - check test == (await wait(testFuture, 10.secs)) - - await stream.close() - await nativeNode.stop() - await daemonNode.close() - await sleepAsync(500.millis) - - asyncTest "daemon -> native websocket connection": - var protos = @["/test-stream"] - var test = "TEST STRING" - # We are preparing expect string, which should be prefixed with varint - # length and do not have `\r\n` suffix, because we going to use - # readLine(). - var buffer = initVBuffer() - buffer.writeSeq(test & "\r\n") - buffer.finish() - var expect = newString(len(buffer) - 2) - copyMem(addr expect[0], addr buffer.buffer[0], len(expect)) - - let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet() - let nativeNode = SwitchBuilder - .new() - .withAddress(wsAddress) - .withRng(crypto.newRng()) - .withMplex() - .withWsTransport() - .withNoise() - .build() - - await nativeNode.start() - - let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress]) - let daemonPeer = await daemonNode.identity() - - var testFuture = newFuture[string]("test.future") - proc daemonHandler( - api: DaemonAPI, stream: P2PStream - ) {.async: (raises: [CatchableError]).} = - # We should perform `readLp()` instead of `readLine()`. `readLine()` - # here reads actually length prefixed string. - var line = await stream.transp.readLine() - check line == expect - testFuture.complete(line) - await stream.close() - - await daemonNode.addHandler(protos, daemonHandler) - let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0]) - await conn.writeLp(test & "\r\n") - check expect == (await wait(testFuture, 10.secs)) - - await conn.close() - await nativeNode.stop() - await daemonNode.close() - - asyncTest "daemon -> multiple reads and writes": - var protos = @["/test-stream"] - - var testFuture = newFuture[void]("test.future") - proc nativeHandler( - conn: Connection, proto: string - ) {.async: (raises: [CancelledError]).} = - try: - check "test 1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test 2".toBytes()) - - check "test 3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test 4".toBytes()) - - testFuture.complete() - except CancelledError as e: - raise e - except CatchableError: - check false # should not be here - finally: - await conn.close() - - # custom proto - var proto = new LPProtocol - proto.handler = nativeHandler - proto.codec = protos[0] # codec - - let nativeNode = swCreator() - - nativeNode.mount(proto) - - await nativeNode.start() - let nativePeer = nativeNode.peerInfo - - let daemonNode = await newDaemonApi() - await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - var stream = await daemonNode.openStream(nativePeer.peerId, protos) - - asyncDiscard stream.transp.writeLp("test 1") - check "test 2" == string.fromBytes(await stream.transp.readLp()) - - asyncDiscard stream.transp.writeLp("test 3") - check "test 4" == string.fromBytes(await stream.transp.readLp()) - - await wait(testFuture, 10.secs) - - await stream.close() - await nativeNode.stop() - await daemonNode.close() - - asyncTest "read write multiple": - var protos = @["/test-stream"] - var test = "TEST STRING" - - var count = 0 - var testFuture = newFuture[int]("test.future") - proc nativeHandler( - conn: Connection, proto: string - ) {.async: (raises: [CancelledError]).} = - try: - while count < 10: - var line = string.fromBytes(await conn.readLp(1024)) - check line == test - await conn.writeLp(test.toBytes()) - count.inc() - - testFuture.complete(count) - except CancelledError as e: - raise e - except CatchableError: - check false # should not be here - finally: - await conn.close() - - # custom proto - var proto = new LPProtocol - proto.handler = nativeHandler - proto.codec = protos[0] # codec - - let nativeNode = swCreator() - - nativeNode.mount(proto) - - await nativeNode.start() - let nativePeer = nativeNode.peerInfo - - let daemonNode = await newDaemonApi() - await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - var stream = await daemonNode.openStream(nativePeer.peerId, protos) - - var count2 = 0 - while count2 < 10: - discard await stream.transp.writeLp(test) - let line = await stream.transp.readLp() - check test == string.fromBytes(line) - inc(count2) - - check 10 == (await wait(testFuture, 1.minutes)) - await stream.close() - await nativeNode.stop() - await daemonNode.close() - - asyncTest "floodsub: daemon publish one": - await testPubSubDaemonPublish(swCreator = swCreator) - - asyncTest "floodsub: daemon publish many": - await testPubSubDaemonPublish(count = 10, swCreator = swCreator) - - asyncTest "gossipsub: daemon publish one": - await testPubSubDaemonPublish(gossip = true, swCreator = swCreator) - - asyncTest "gossipsub: daemon publish many": - await testPubSubDaemonPublish(gossip = true, count = 10, swCreator = swCreator) - - asyncTest "floodsub: node publish one": - await testPubSubNodePublish(swCreator = swCreator) - - asyncTest "floodsub: node publish many": - await testPubSubNodePublish(count = 10, swCreator = swCreator) - - asyncTest "gossipsub: node publish one": - await testPubSubNodePublish(gossip = true, swCreator = swCreator) - - asyncTest "gossipsub: node publish many": - await testPubSubNodePublish(gossip = true, count = 10, swCreator = swCreator) - -proc relayInteropTests*(name: string, relayCreator: SwitchCreator) = - suite "Interop relay using " & name: - asyncTest "NativeSrc -> NativeRelay -> DaemonDst": - let closeBlocker = newFuture[void]() - let daemonFinished = newFuture[void]() - # TODO: This Future blocks the daemonHandler after sending the last message. - # It exists because there's a strange behavior where stream.close sends - # a Rst instead of Fin. We should investigate this at some point. - proc daemonHandler( - api: DaemonAPI, stream: P2PStream - ) {.async: (raises: [CatchableError]).} = - check "line1" == string.fromBytes(await stream.transp.readLp()) - discard await stream.transp.writeLp("line2") - check "line3" == string.fromBytes(await stream.transp.readLp()) - discard await stream.transp.writeLp("line4") - await closeBlocker - await stream.close() - daemonFinished.complete() - - let - maSrc = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - maRel = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - src = relayCreator(maSrc, relay = RelayClient.new(circuitRelayV1 = true)) - rel = relayCreator(maRel) - - await src.start() - await rel.start() - let daemonNode = await newDaemonApi() - let daemonPeer = await daemonNode.identity() - let maStr = - $rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit" - let maddr = MultiAddress.init(maStr).tryGet() - await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await rel.connect(daemonPeer.peer, daemonPeer.addresses) - - await daemonNode.addHandler(@["/testCustom"], daemonHandler) - - let conn = await src.dial(daemonPeer.peer, @[maddr], @["/testCustom"]) - - await conn.writeLp("line1") - check string.fromBytes(await conn.readLp(1024)) == "line2" - - await conn.writeLp("line3") - check string.fromBytes(await conn.readLp(1024)) == "line4" - - closeBlocker.complete() - await daemonFinished - await conn.close() - await allFutures(src.stop(), rel.stop()) - try: - await daemonNode.close() - except CatchableError as e: - when defined(windows): - # On Windows, daemon close may fail due to socket race condition - # This is expected behavior and can be safely ignored - discard - else: - raise e - - asyncTest "DaemonSrc -> NativeRelay -> NativeDst": - proc customHandler( - conn: Connection, proto: string - ) {.async: (raises: [CancelledError]).} = - try: - check "line1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("line2") - check "line3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("line4") - except CancelledError as e: - raise e - except CatchableError: - check false # should not be here - finally: - await conn.close() - - let protos = @["/customProto", RelayV1Codec] - var customProto = new LPProtocol - customProto.handler = customHandler - customProto.codec = protos[0] - let - maRel = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - maDst = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - rel = relayCreator(maRel) - dst = relayCreator(maDst, relay = RelayClient.new()) - - dst.mount(customProto) - await rel.start() - await dst.start() - let daemonNode = await newDaemonApi() - let daemonPeer = await daemonNode.identity() - let maStr = - $rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit" - let maddr = MultiAddress.init(maStr).tryGet() - await daemonNode.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await rel.connect(dst.peerInfo.peerId, dst.peerInfo.addrs) - await daemonNode.connect(dst.peerInfo.peerId, @[maddr]) - var stream = await daemonNode.openStream(dst.peerInfo.peerId, protos) - - discard await stream.transp.writeLp("line1") - check string.fromBytes(await stream.transp.readLp()) == "line2" - discard await stream.transp.writeLp("line3") - check string.fromBytes(await stream.transp.readLp()) == "line4" - - await allFutures(dst.stop(), rel.stop()) - await daemonNode.close() - - asyncTest "NativeSrc -> DaemonRelay -> NativeDst": - proc customHandler( - conn: Connection, proto: string - ) {.async: (raises: [CancelledError]).} = - try: - check "line1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("line2") - check "line3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("line4") - except CancelledError as e: - raise e - except CatchableError: - check false # should not be here - finally: - await conn.close() - - let protos = @["/customProto", RelayV1Codec] - var customProto = new LPProtocol - customProto.handler = customHandler - customProto.codec = protos[0] - let - maSrc = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - maDst = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - src = relayCreator(maSrc, relay = RelayClient.new()) - dst = relayCreator(maDst, relay = RelayClient.new()) - - dst.mount(customProto) - await src.start() - await dst.start() - let daemonNode = await newDaemonApi({RelayHop}) - let daemonPeer = await daemonNode.identity() - let maStr = $daemonPeer.addresses[0] & "/p2p/" & $daemonPeer.peer & "/p2p-circuit" - let maddr = MultiAddress.init(maStr).tryGet() - await src.connect(daemonPeer.peer, daemonPeer.addresses) - await daemonNode.connect(dst.peerInfo.peerId, dst.peerInfo.addrs) - let conn = await src.dial(dst.peerInfo.peerId, @[maddr], protos[0]) - - await conn.writeLp("line1") - check string.fromBytes(await conn.readLp(1024)) == "line2" - - await conn.writeLp("line3") - check string.fromBytes(await conn.readLp(1024)) == "line4" - - await allFutures(src.stop(), dst.stop()) - await daemonNode.close() diff --git a/tests/testall.nim b/tests/testall.nim index 4dd6f58b4..8073d723f 100644 --- a/tests/testall.nim +++ b/tests/testall.nim @@ -1,3 +1,3 @@ {.used.} -import testnative, ./pubsub/testpubsub, testinterop, testdaemon +import testnative, ./pubsub/testpubsub diff --git a/tests/testdaemon.nim b/tests/testdaemon.nim deleted file mode 100644 index 41bc97992..000000000 --- a/tests/testdaemon.nim +++ /dev/null @@ -1,123 +0,0 @@ -import chronos, unittest2, helpers -import - ../libp2p/daemon/daemonapi, - ../libp2p/multiaddress, - ../libp2p/multicodec, - ../libp2p/cid, - ../libp2p/multihash, - ../libp2p/peerid - -when defined(nimHasUsed): - {.used.} - -proc identitySpawnTest(): Future[bool] {.async.} = - var api = await newDaemonApi() - var data = await api.identity() - await api.close() - result = true - -proc connectStreamTest(): Future[bool] {.async.} = - var api1 = await newDaemonApi() - var api2 = await newDaemonApi() - - var id1 = await api1.identity() - var id2 = await api2.identity() - - var protos = @["/test-stream"] - var test = "TEST STRING" - - var testFuture = newFuture[string]("test.future") - - proc streamHandler( - api: DaemonAPI, stream: P2PStream - ) {.async: (raises: [CatchableError]).} = - var line = await stream.transp.readLine() - testFuture.complete(line) - - await api2.addHandler(protos, streamHandler) - await api1.connect(id2.peer, id2.addresses) - # echo await api1.listPeers() - var stream = await api1.openStream(id2.peer, protos) - let sent = await stream.transp.write(test & "\r\n") - doAssert(sent == len(test) + 2) - doAssert((await wait(testFuture, 10.seconds)) == test) - await stream.close() - await api1.close() - await api2.close() - result = true - -proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = - var pubsubData = "TEST MESSAGE" - var msgData = cast[seq[byte]](pubsubData) - var api1, api2: DaemonAPI - - api1 = await newDaemonApi(f) - api2 = await newDaemonApi(f) - - var id1 = await api1.identity() - var id2 = await api2.identity() - - var resultsCount = 0 - - var handlerFuture1 = newFuture[void]() - var handlerFuture2 = newFuture[void]() - - proc pubsubHandler1( - api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage - ): Future[bool] {.async: (raises: [CatchableError]).} = - let smsg = cast[string](message.data) - if smsg == pubsubData: - inc(resultsCount) - handlerFuture1.complete() - # Callback must return `false` to close subscription channel. - result = false - - proc pubsubHandler2( - api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage - ): Future[bool] {.async: (raises: [CatchableError]).} = - let smsg = cast[string](message.data) - if smsg == pubsubData: - inc(resultsCount) - handlerFuture2.complete() - # Callback must return `false` to close subscription channel. - result = false - - await api1.connect(id2.peer, id2.addresses) - await api2.connect(id1.peer, id1.addresses) - - var ticket1 = await api1.pubsubSubscribe("test-topic", pubsubHandler1) - var ticket2 = await api2.pubsubSubscribe("test-topic", pubsubHandler2) - - await sleepAsync(1.seconds) - - var topics1 = await api1.pubsubGetTopics() - var topics2 = await api2.pubsubGetTopics() - - if len(topics1) == 1 and len(topics2) == 1: - var peers1 = await api1.pubsubListPeers("test-topic") - var peers2 = await api2.pubsubListPeers("test-topic") - if len(peers1) == 1 and len(peers2) == 1: - # Publish test data via api1. - await sleepAsync(250.milliseconds) - await api1.pubsubPublish("test-topic", msgData) - var res = - await one(allFutures(handlerFuture1, handlerFuture2), sleepAsync(5.seconds)) - - await api1.close() - await api2.close() - if resultsCount == 2: - result = true - -suite "libp2p-daemon test suite": - test "Simple spawn and get identity test": - check: - waitFor(identitySpawnTest()) == true - test "Connect/Accept peer/stream test": - check: - waitFor(connectStreamTest()) == true - asyncTest "GossipSub test": - checkUntilTimeoutCustom(10.seconds, 100.milliseconds): - (await pubsubTest({PSGossipSub})) - asyncTest "FloodSub test": - checkUntilTimeoutCustom(10.seconds, 100.milliseconds): - (await pubsubTest({PSFloodSub})) diff --git a/tests/testinterop.nim b/tests/testinterop.nim deleted file mode 100644 index c2614af0e..000000000 --- a/tests/testinterop.nim +++ /dev/null @@ -1,58 +0,0 @@ -{.used.} - -import helpers, commoninterop -import ../libp2p -import ../libp2p/crypto/crypto, ../libp2p/protocols/connectivity/relay/relay - -proc switchMplexCreator( - ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), - prov = proc(config: TransportConfig): Transport = - TcpTransport.new({}, config.upgr), - relay: Relay = Relay.new(circuitRelayV1 = true), -): Switch {.raises: [LPError].} = - SwitchBuilder - .new() - .withSignedPeerRecord(false) - .withMaxConnections(MaxConnections) - .withRng(crypto.newRng()) - .withAddresses(@[ma]) - .withMaxIn(-1) - .withMaxOut(-1) - .withTransport(prov) - .withMplex() - .withMaxConnsPerPeer(MaxConnectionsPerPeer) - .withPeerStore(capacity = 1000) - .withNoise() - .withCircuitRelay(relay) - .withNameResolver(nil) - .build() - -proc switchYamuxCreator( - ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), - prov = proc(config: TransportConfig): Transport = - TcpTransport.new({}, config.upgr), - relay: Relay = Relay.new(circuitRelayV1 = true), -): Switch {.raises: [LPError].} = - SwitchBuilder - .new() - .withSignedPeerRecord(false) - .withMaxConnections(MaxConnections) - .withRng(crypto.newRng()) - .withAddresses(@[ma]) - .withMaxIn(-1) - .withMaxOut(-1) - .withTransport(prov) - .withYamux() - .withMaxConnsPerPeer(MaxConnectionsPerPeer) - .withPeerStore(capacity = 1000) - .withNoise() - .withCircuitRelay(relay) - .withNameResolver(nil) - .build() - -suite "Tests interop": - commonInteropTests("mplex", switchMplexCreator) - relayInteropTests("mplex", switchMplexCreator) - - commonInteropTests("yamux", switchYamuxCreator) - relayInteropTests("yamux", switchYamuxCreator) diff --git a/tests/testmultiaddress.nim b/tests/testmultiaddress.nim index f460e8a06..3d0f9f386 100644 --- a/tests/testmultiaddress.nim +++ b/tests/testmultiaddress.nim @@ -123,7 +123,7 @@ const "/ip4/127.0.0.1/ipfs", "/ip4/127.0.0.1/ipfs/tcp", "/p2p-circuit/50", ] - PathVectors = ["/unix/tmp/p2pd.sock", "/unix/a/b/c/d/e/f/g/h/i.sock"] + PathVectors = ["/unix/a/b/c/d/e/f/g/h/i.sock"] PathExpects = [ "90030E2F746D702F703270642E736F636B", diff --git a/tests/testrelayv1.nim b/tests/testrelayv1.nim index c076b6d65..76a354664 100644 --- a/tests/testrelayv1.nim +++ b/tests/testrelayv1.nim @@ -28,7 +28,6 @@ import builders, upgrademngrs/upgrade, varint, - daemon/daemonapi, ] import ./helpers