mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 12:28:18 -05:00
Compare commits
9 Commits
minimize_i
...
test-upgra
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
65d1fbf6c2 | ||
|
|
2c3613577b | ||
|
|
ac90d29d21 | ||
|
|
af850b0595 | ||
|
|
ac5d23d145 | ||
|
|
db289ba468 | ||
|
|
6b1fd79556 | ||
|
|
a8d161bc9a | ||
|
|
79cddb0088 |
80
.github/workflows/ci.yml
vendored
80
.github/workflows/ci.yml
vendored
@@ -205,3 +205,83 @@ jobs:
|
||||
cd nim-libp2p
|
||||
nimble install -y --depsOnly
|
||||
nimble test
|
||||
|
||||
bumpNBC-stable:
|
||||
needs: build
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: status-im/github-app-token@v1
|
||||
name: Generate token
|
||||
id: generate-token
|
||||
with:
|
||||
app_id: ${{ secrets.BUMP_BOT_APP_ID }}
|
||||
private_key: ${{ secrets.BUMP_BOT_APP_PRIVATE_KEY }}
|
||||
|
||||
- name: Clone NBC
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
repository: status-im/nimbus-eth2
|
||||
path: nbc
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Checkout this ref
|
||||
run: |
|
||||
cd nbc/vendor/nim-libp2p
|
||||
git checkout $GITHUB_SHA
|
||||
|
||||
- name: Commit this bump
|
||||
run: |
|
||||
cd nbc
|
||||
git config --global user.email "${{ github.actor }}@users.noreply.github.com"
|
||||
git config --global user.name = "${{ github.actor }}"
|
||||
git commit -a -m "auto-bump nim-libp2p"
|
||||
|
||||
- name: Make PR
|
||||
uses: peter-evans/create-pull-request@v3.5.0
|
||||
with:
|
||||
branch: nim-libp2p-auto-bump
|
||||
path: nbc
|
||||
token: ${{ steps.generate-token.outputs.token }}
|
||||
title: nim-libp2p auto bump
|
||||
|
||||
bumpNBC-unstable:
|
||||
needs: build
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: status-im/github-app-token@v1
|
||||
name: Generate token
|
||||
id: generate-token
|
||||
with:
|
||||
app_id: ${{ secrets.BUMP_BOT_APP_ID }}
|
||||
private_key: ${{ secrets.BUMP_BOT_APP_PRIVATE_KEY }}
|
||||
|
||||
- name: Clone NBC
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
repository: status-im/nimbus-eth2
|
||||
ref: unstable
|
||||
path: nbc
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Checkout this ref
|
||||
run: |
|
||||
cd nbc/vendor/nim-libp2p
|
||||
git checkout $GITHUB_SHA
|
||||
|
||||
- name: Commit this bump
|
||||
run: |
|
||||
cd nbc
|
||||
git config --global user.email "${{ github.actor }}@users.noreply.github.com"
|
||||
git config --global user.name = "${{ github.actor }}"
|
||||
git commit -a -m "auto-bump nim-libp2p"
|
||||
|
||||
- name: Make PR
|
||||
uses: peter-evans/create-pull-request@v3.5.0
|
||||
with:
|
||||
branch: nim-libp2p-auto-bump-unstable
|
||||
path: nbc
|
||||
token: ${{ steps.generate-token.outputs.token }}
|
||||
title: nim-libp2p unstable auto bump
|
||||
draft: true
|
||||
|
||||
46
.github/workflows/nbc.yml
vendored
46
.github/workflows/nbc.yml
vendored
@@ -1,46 +0,0 @@
|
||||
name: NBC Bump PR
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
|
||||
jobs:
|
||||
bumpNBC:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: status-im/github-app-token@v1
|
||||
name: Generate token
|
||||
id: generate-token
|
||||
with:
|
||||
app_id: ${{ secrets.BUMP_BOT_APP_ID }}
|
||||
private_key: ${{ secrets.BUMP_BOT_APP_PRIVATE_KEY }}
|
||||
|
||||
- name: Clone NBC
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
repository: status-im/nimbus-eth2
|
||||
ref: unstable
|
||||
path: nbc
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Checkout this ref
|
||||
run: |
|
||||
cd nbc/vendor/nim-libp2p
|
||||
git checkout $GITHUB_SHA
|
||||
|
||||
- name: Commit this bump
|
||||
run: |
|
||||
cd nbc
|
||||
git config --global user.email "${{ github.actor }}@users.noreply.github.com"
|
||||
git config --global user.name = "${{ github.actor }}"
|
||||
git commit -a -m "auto-bump nim-libp2p"
|
||||
|
||||
- name: Make PR
|
||||
uses: peter-evans/create-pull-request@v3.5.0
|
||||
with:
|
||||
branch: nim-libp2p-auto-bump
|
||||
path: nbc
|
||||
token: ${{ steps.generate-token.outputs.token }}
|
||||
title: nim-libp2p auto bump
|
||||
@@ -25,7 +25,7 @@ const
|
||||
MaxConnectionsPerPeer* = 5
|
||||
|
||||
type
|
||||
TooManyConnectionsError* = object of CatchableError
|
||||
TooManyConnectionsError* = object of LPError
|
||||
|
||||
ConnProvider* = proc(): Future[Connection] {.gcsafe, closure.}
|
||||
|
||||
@@ -262,7 +262,7 @@ proc cleanupConn(c: ConnManager, conn: Connection) {.async.} =
|
||||
proc onConnUpgraded(c: ConnManager, conn: Connection) {.async.} =
|
||||
try:
|
||||
trace "Triggering connect events", conn
|
||||
conn.upgrade()
|
||||
conn.upgradeComplete()
|
||||
|
||||
let peerId = conn.peerInfo.peerId
|
||||
await c.triggerPeerEvents(
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
import std/[os, osproc, strutils, tables, strtabs]
|
||||
import chronos, chronicles
|
||||
import ../varint, ../multiaddress, ../multicodec, ../cid, ../peerid
|
||||
import ../wire, ../multihash, ../protobuf/minprotobuf
|
||||
import ../wire, ../multihash, ../protobuf/minprotobuf, ../errors
|
||||
import ../crypto/crypto
|
||||
|
||||
export peerid, multiaddress, multicodec, multihash, cid, crypto, wire
|
||||
@@ -152,8 +152,9 @@ type
|
||||
ticket: PubsubTicket,
|
||||
message: PubSubMessage): Future[bool] {.gcsafe.}
|
||||
|
||||
DaemonRemoteError* = object of CatchableError
|
||||
DaemonLocalError* = object of CatchableError
|
||||
DaemonError* = object of LPError
|
||||
DaemonRemoteError* = object of DaemonError
|
||||
DaemonLocalError* = object of DaemonError
|
||||
|
||||
var daemonsCount {.threadvar.}: int
|
||||
|
||||
|
||||
46
libp2p/dial.nim
Normal file
46
libp2p/dial.nim
Normal file
@@ -0,0 +1,46 @@
|
||||
## Nim-LibP2P
|
||||
## Copyright (c) 2021 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import chronos
|
||||
import peerid,
|
||||
stream/connection
|
||||
|
||||
type
|
||||
Dial* = ref object of RootObj
|
||||
|
||||
method connect*(
|
||||
self: Dial,
|
||||
peerId: PeerID,
|
||||
addrs: seq[MultiAddress]) {.async, base.} =
|
||||
## connect remote peer without negotiating
|
||||
## a protocol
|
||||
##
|
||||
|
||||
doAssert(false, "Not implemented!")
|
||||
|
||||
method dial*(
|
||||
self: Dial,
|
||||
peerId: PeerID,
|
||||
protos: seq[string]): Future[Connection] {.async, base.} =
|
||||
## create a protocol stream over an
|
||||
## existing connection
|
||||
##
|
||||
|
||||
doAssert(false, "Not implemented!")
|
||||
|
||||
method dial*(
|
||||
self: Dial,
|
||||
peerId: PeerID,
|
||||
addrs: seq[MultiAddress],
|
||||
protos: seq[string]): Future[Connection] {.async, base.} =
|
||||
## create a protocol stream and establish
|
||||
## a connection if one doesn't exist already
|
||||
##
|
||||
|
||||
doAssert(false, "Not implemented!")
|
||||
241
libp2p/dialer.nim
Normal file
241
libp2p/dialer.nim
Normal file
@@ -0,0 +1,241 @@
|
||||
## Nim-LibP2P
|
||||
## Copyright (c) 2021 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import std/[sugar, tables]
|
||||
|
||||
import pkg/[chronos,
|
||||
chronicles,
|
||||
metrics]
|
||||
|
||||
import dial,
|
||||
peerid,
|
||||
peerinfo,
|
||||
multistream,
|
||||
connmanager,
|
||||
stream/connection,
|
||||
transports/transport
|
||||
|
||||
export dial
|
||||
|
||||
logScope:
|
||||
topics = "libp2p dialer"
|
||||
|
||||
declareCounter(libp2p_total_dial_attempts, "total attempted dials")
|
||||
declareCounter(libp2p_successful_dials, "dialed successful peers")
|
||||
declareCounter(libp2p_failed_dials, "failed dials")
|
||||
declareCounter(libp2p_failed_upgrades_outgoing, "outgoing connections failed upgrades")
|
||||
|
||||
type
|
||||
DialFailedError* = object of CatchableError
|
||||
|
||||
Dialer* = ref object of Dial
|
||||
peerInfo*: PeerInfo
|
||||
ms: MultistreamSelect
|
||||
connManager: ConnManager
|
||||
dialLock: Table[PeerID, AsyncLock]
|
||||
transports: seq[Transport]
|
||||
|
||||
proc dialAndUpgrade(
|
||||
self: Dialer,
|
||||
peerId: PeerID,
|
||||
addrs: seq[MultiAddress]):
|
||||
Future[Connection] {.async.} =
|
||||
debug "Dialing peer", peerId
|
||||
|
||||
# Avoid "cannot be captured as it would violate memory safety" errors in Nim-1.4.x.
|
||||
var
|
||||
transport: Transport
|
||||
address: MultiAddress
|
||||
|
||||
for t in self.transports: # for each transport
|
||||
transport = t
|
||||
for a in addrs: # for each address
|
||||
address = a
|
||||
if t.handles(a): # check if it can dial it
|
||||
trace "Dialing address", address = $a, peerId
|
||||
let dialed = try:
|
||||
libp2p_total_dial_attempts.inc()
|
||||
# await a connection slot when the total
|
||||
# connection count is equal to `maxConns`
|
||||
await self.connManager.trackOutgoingConn(
|
||||
() => transport.dial(address)
|
||||
)
|
||||
except TooManyConnectionsError as exc:
|
||||
trace "Connection limit reached!"
|
||||
raise exc
|
||||
except CancelledError as exc:
|
||||
debug "Dialing canceled", msg = exc.msg, peerId
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "Dialing failed", msg = exc.msg, peerId
|
||||
libp2p_failed_dials.inc()
|
||||
continue # Try the next address
|
||||
|
||||
# make sure to assign the peer to the connection
|
||||
dialed.peerInfo = PeerInfo.init(peerId, addrs)
|
||||
|
||||
# also keep track of the connection's bottom unsafe transport direction
|
||||
# required by gossipsub scoring
|
||||
dialed.transportDir = Direction.Out
|
||||
|
||||
libp2p_successful_dials.inc()
|
||||
|
||||
let conn = try:
|
||||
await transport.upgradeOutgoing(dialed)
|
||||
except CatchableError as exc:
|
||||
# If we failed to establish the connection through one transport,
|
||||
# we won't succeeded through another - no use in trying again
|
||||
await dialed.close()
|
||||
debug "Upgrade failed", msg = exc.msg, peerId
|
||||
if exc isnot CancelledError:
|
||||
libp2p_failed_upgrades_outgoing.inc()
|
||||
raise exc
|
||||
|
||||
doAssert not isNil(conn), "connection died after upgradeOutgoing"
|
||||
debug "Dial successful", conn, peerInfo = conn.peerInfo
|
||||
return conn
|
||||
|
||||
proc internalConnect(
|
||||
self: Dialer,
|
||||
peerId: PeerID,
|
||||
addrs: seq[MultiAddress]):
|
||||
Future[Connection] {.async.} =
|
||||
if self.peerInfo.peerId == peerId:
|
||||
raise newException(CatchableError, "can't dial self!")
|
||||
|
||||
# Ensure there's only one in-flight attempt per peer
|
||||
let lock = self.dialLock.mgetOrPut(peerId, newAsyncLock())
|
||||
try:
|
||||
await lock.acquire()
|
||||
|
||||
# Check if we have a connection already and try to reuse it
|
||||
var conn = self.connManager.selectConn(peerId)
|
||||
if conn != nil:
|
||||
if conn.atEof or conn.closed:
|
||||
# This connection should already have been removed from the connection
|
||||
# manager - it's essentially a bug that we end up here - we'll fail
|
||||
# for now, hoping that this will clean themselves up later...
|
||||
warn "dead connection in connection manager", conn
|
||||
await conn.close()
|
||||
raise newException(DialFailedError, "Zombie connection encountered")
|
||||
|
||||
trace "Reusing existing connection", conn, direction = $conn.dir
|
||||
return conn
|
||||
|
||||
conn = await self.dialAndUpgrade(peerId, addrs)
|
||||
if isNil(conn): # None of the addresses connected
|
||||
raise newException(DialFailedError, "Unable to establish outgoing link")
|
||||
|
||||
# We already check for this in Connection manager
|
||||
# but a disconnect could have happened right after
|
||||
# we've added the connection so we check again
|
||||
# to prevent races due to that.
|
||||
if conn.closed() or conn.atEof():
|
||||
# This can happen when the other ends drops us
|
||||
# before we get a chance to return the connection
|
||||
# back to the dialer.
|
||||
trace "Connection dead on arrival", conn
|
||||
raise newLPStreamClosedError()
|
||||
|
||||
return conn
|
||||
finally:
|
||||
if lock.locked():
|
||||
lock.release()
|
||||
|
||||
method connect*(
|
||||
self: Dialer,
|
||||
peerId: PeerID,
|
||||
addrs: seq[MultiAddress]) {.async.} =
|
||||
## connect remote peer without negotiating
|
||||
## a protocol
|
||||
##
|
||||
|
||||
if self.connManager.connCount(peerId) > 0:
|
||||
return
|
||||
|
||||
discard await self.internalConnect(peerId, addrs)
|
||||
|
||||
proc negotiateStream(
|
||||
self: Dialer,
|
||||
conn: Connection,
|
||||
protos: seq[string]): Future[Connection] {.async.} =
|
||||
trace "Negotiating stream", conn, protos
|
||||
let selected = await self.ms.select(conn, protos)
|
||||
if not protos.contains(selected):
|
||||
await conn.closeWithEOF()
|
||||
raise newException(DialFailedError, "Unable to select sub-protocol " & $protos)
|
||||
|
||||
return conn
|
||||
|
||||
method dial*(
|
||||
self: Dialer,
|
||||
peerId: PeerID,
|
||||
protos: seq[string]): Future[Connection] {.async.} =
|
||||
## create a protocol stream over an
|
||||
## existing connection
|
||||
##
|
||||
|
||||
trace "Dialing (existing)", peerId, protos
|
||||
let stream = await self.connManager.getStream(peerId)
|
||||
if stream.isNil:
|
||||
raise newException(DialFailedError, "Couldn't get muxed stream")
|
||||
|
||||
return await self.negotiateStream(stream, protos)
|
||||
|
||||
method dial*(
|
||||
self: Dialer,
|
||||
peerId: PeerID,
|
||||
addrs: seq[MultiAddress],
|
||||
protos: seq[string]): Future[Connection] {.async.} =
|
||||
## create a protocol stream and establish
|
||||
## a connection if one doesn't exist already
|
||||
##
|
||||
|
||||
var
|
||||
conn: Connection
|
||||
stream: Connection
|
||||
|
||||
proc cleanup() {.async.} =
|
||||
if not(isNil(stream)):
|
||||
await stream.closeWithEOF()
|
||||
|
||||
if not(isNil(conn)):
|
||||
await conn.close()
|
||||
|
||||
try:
|
||||
trace "Dialing (new)", peerId, protos
|
||||
conn = await self.internalConnect(peerId, addrs)
|
||||
trace "Opening stream", conn
|
||||
stream = await self.connManager.getStream(conn)
|
||||
|
||||
if isNil(stream):
|
||||
raise newException(DialFailedError,
|
||||
"Couldn't get muxed stream")
|
||||
|
||||
return await self.negotiateStream(stream, protos)
|
||||
except CancelledError as exc:
|
||||
trace "Dial canceled", conn
|
||||
await cleanup()
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "Error dialing", conn, msg = exc.msg
|
||||
await cleanup()
|
||||
raise exc
|
||||
|
||||
proc new*(
|
||||
T: type Dialer,
|
||||
peerInfo: PeerInfo,
|
||||
connManager: ConnManager,
|
||||
transports: seq[Transport],
|
||||
ms: MultistreamSelect): Dialer =
|
||||
|
||||
T(peerInfo: peerInfo,
|
||||
connManager: connManager,
|
||||
transports: transports,
|
||||
ms: ms)
|
||||
@@ -5,6 +5,10 @@ import chronos
|
||||
import chronicles
|
||||
import macros
|
||||
|
||||
type
|
||||
# Base exception type for libp2p
|
||||
LPError* = object of CatchableError
|
||||
|
||||
# could not figure how to make it with a simple template
|
||||
# sadly nim needs more love for hygenic templates
|
||||
# so here goes the macro, its based on the proc/template version
|
||||
|
||||
@@ -15,7 +15,7 @@ import nativesockets, hashes
|
||||
import tables, strutils, stew/shims/net
|
||||
import chronos
|
||||
import multicodec, multihash, multibase, transcoder, vbuffer, peerid,
|
||||
protobuf/minprotobuf
|
||||
protobuf/minprotobuf, errors
|
||||
import stew/[base58, base32, endians2, results]
|
||||
export results, minprotobuf, vbuffer
|
||||
|
||||
@@ -46,7 +46,7 @@ type
|
||||
|
||||
MaResult*[T] = Result[T, string]
|
||||
|
||||
MaError* = object of CatchableError
|
||||
MaError* = object of LPError
|
||||
MaInvalidAddress* = object of MaError
|
||||
|
||||
IpTransportProtocol* = enum
|
||||
|
||||
@@ -7,12 +7,12 @@
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import chronos
|
||||
import nimcrypto/utils, chronicles, stew/byteutils
|
||||
import pkg/[chronos, nimcrypto/utils, chronicles, stew/byteutils]
|
||||
import ../../stream/connection,
|
||||
../../utility,
|
||||
../../varint,
|
||||
../../vbuffer
|
||||
../../vbuffer,
|
||||
../muxer
|
||||
|
||||
logScope:
|
||||
topics = "libp2p mplexcoder"
|
||||
@@ -32,12 +32,12 @@ type
|
||||
msgType: MessageType
|
||||
data: seq[byte]
|
||||
|
||||
InvalidMplexMsgType = object of CatchableError
|
||||
InvalidMplexMsgType* = object of MuxerError
|
||||
|
||||
# https://github.com/libp2p/specs/tree/master/mplex#writing-to-a-stream
|
||||
const MaxMsgSize* = 1 shl 20 # 1mb
|
||||
|
||||
proc newInvalidMplexMsgType*(): ref InvalidMplexMsgType =
|
||||
proc newInvalidMplexMsgType(): ref InvalidMplexMsgType =
|
||||
newException(InvalidMplexMsgType, "invalid message type")
|
||||
|
||||
proc readMsg*(conn: Connection): Future[Msg] {.async, gcsafe.} =
|
||||
|
||||
@@ -8,10 +8,9 @@
|
||||
## those terms.
|
||||
|
||||
import std/[oids, strformat]
|
||||
import chronos, chronicles, metrics
|
||||
import pkg/[chronos, chronicles, metrics, nimcrypto/utils]
|
||||
import ./coder,
|
||||
../muxer,
|
||||
nimcrypto/utils,
|
||||
../../stream/[bufferstream, connection, streamseq],
|
||||
../../peerinfo
|
||||
|
||||
|
||||
@@ -32,8 +32,8 @@ when defined(libp2p_expensive_metrics):
|
||||
"mplex channels", labels = ["initiator", "peer"])
|
||||
|
||||
type
|
||||
TooManyChannels* = object of CatchableError
|
||||
InvalidChannelIdError* = object of CatchableError
|
||||
TooManyChannels* = object of MuxerError
|
||||
InvalidChannelIdError* = object of MuxerError
|
||||
|
||||
Mplex* = ref object of Muxer
|
||||
channels: array[bool, Table[uint64, LPChannel]]
|
||||
|
||||
@@ -19,6 +19,8 @@ const
|
||||
DefaultChanTimeout* = 5.minutes
|
||||
|
||||
type
|
||||
MuxerError* = object of LPError
|
||||
|
||||
StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe.}
|
||||
MuxerHandler* = proc(muxer: Muxer): Future[void] {.gcsafe.}
|
||||
|
||||
|
||||
@@ -30,7 +30,7 @@ const
|
||||
#TODO: implement push identify, leaving out for now as it is not essential
|
||||
|
||||
type
|
||||
IdentifyError* = object of CatchableError
|
||||
IdentifyError* = object of LPError
|
||||
IdentityNoMatchError* = object of IdentifyError
|
||||
IdentityInvalidMsgError* = object of IdentifyError
|
||||
|
||||
|
||||
@@ -85,10 +85,11 @@ type
|
||||
readCs: CipherState
|
||||
writeCs: CipherState
|
||||
|
||||
NoiseHandshakeError* = object of CatchableError
|
||||
NoiseDecryptTagError* = object of CatchableError
|
||||
NoiseOversizedPayloadError* = object of CatchableError
|
||||
NoiseNonceMaxError* = object of CatchableError # drop connection on purpose
|
||||
NoiseError* = object of LPError
|
||||
NoiseHandshakeError* = object of NoiseError
|
||||
NoiseDecryptTagError* = object of NoiseError
|
||||
NoiseOversizedPayloadError* = object of NoiseError
|
||||
NoiseNonceMaxError* = object of NoiseError # drop connection on purpose
|
||||
|
||||
# Utility
|
||||
|
||||
@@ -577,8 +578,10 @@ method init*(p: Noise) {.gcsafe.} =
|
||||
p.codec = NoiseCodec
|
||||
|
||||
proc newNoise*(
|
||||
rng: ref BrHmacDrbgContext, privateKey: PrivateKey;
|
||||
outgoing: bool = true; commonPrologue: seq[byte] = @[]): Noise =
|
||||
rng: ref BrHmacDrbgContext,
|
||||
privateKey: PrivateKey,
|
||||
outgoing: bool = true,
|
||||
commonPrologue: seq[byte] = @[]): Noise =
|
||||
result = Noise(
|
||||
rng: rng,
|
||||
outgoing: outgoing,
|
||||
|
||||
@@ -68,7 +68,7 @@ type
|
||||
writerCoder: SecureCipher
|
||||
readerCoder: SecureCipher
|
||||
|
||||
SecioError* = object of CatchableError
|
||||
SecioError* = object of LPError
|
||||
|
||||
func shortLog*(conn: SecioConn): auto =
|
||||
if conn.isNil: "SecioConn(nil)"
|
||||
|
||||
@@ -4,7 +4,7 @@ import
|
||||
crypto/crypto, transports/[transport, tcptransport],
|
||||
muxers/[muxer, mplex/mplex],
|
||||
protocols/[identify, secure/secure, secure/noise],
|
||||
connmanager
|
||||
upgrademngrs/[upgrade, muxedupgrade], connmanager
|
||||
|
||||
export
|
||||
switch, peerid, peerinfo, connection, multiaddress, crypto
|
||||
@@ -39,13 +39,10 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
|
||||
let
|
||||
seckey = privKey.get(otherwise = PrivateKey.random(rng[]).tryGet())
|
||||
peerInfo = PeerInfo.init(seckey, [address])
|
||||
mplexProvider = newMuxerProvider(createMplex, MplexCodec)
|
||||
transports = @[Transport(TcpTransport.init(transportFlags))]
|
||||
muxers = {MplexCodec: mplexProvider}.toTable
|
||||
identify = newIdentify(peerInfo)
|
||||
|
||||
var
|
||||
secureManagerInstances: seq[Secure]
|
||||
|
||||
for sec in secureManagers:
|
||||
case sec
|
||||
of SecureProtocol.Noise:
|
||||
@@ -53,15 +50,22 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
|
||||
of SecureProtocol.Secio:
|
||||
quit("Secio is deprecated!") # use of secio is unsafe
|
||||
|
||||
let
|
||||
mplexProvider = newMuxerProvider(createMplex, MplexCodec)
|
||||
ms = newMultistream()
|
||||
identify = newIdentify(peerInfo)
|
||||
muxers = {MplexCodec: mplexProvider}.toTable
|
||||
connManager = ConnManager.init(maxConnsPerPeer, maxConnections, maxIn, maxOut)
|
||||
muxedUpgrade = MuxedUpgrade.init(identify, muxers, secureManagerInstances, connManager, ms)
|
||||
transports = @[Transport(TcpTransport.init(transportFlags, muxedUpgrade))]
|
||||
|
||||
let switch = newSwitch(
|
||||
peerInfo,
|
||||
transports,
|
||||
identify,
|
||||
muxers,
|
||||
secureManagers = secureManagerInstances,
|
||||
maxConnections = maxConnections,
|
||||
maxIn = maxIn,
|
||||
maxOut = maxOut,
|
||||
maxConnsPerPeer = maxConnsPerPeer)
|
||||
connManager = connManager,
|
||||
ms = ms)
|
||||
|
||||
return switch
|
||||
|
||||
@@ -178,7 +178,7 @@ method closeImpl*(s: BufferStream): Future[void] =
|
||||
#
|
||||
# - If a push was in progress but no reader is
|
||||
# attached we need to pop the queue
|
||||
# - If a read was in progress without without a
|
||||
# - If a read was in progress without a
|
||||
# push/data we need to push the Eof marker to
|
||||
# notify the reader that the channel closed
|
||||
#
|
||||
|
||||
@@ -11,9 +11,10 @@ import std/[hashes, oids, strformat]
|
||||
import chronicles, chronos, metrics
|
||||
import lpstream,
|
||||
../multiaddress,
|
||||
../peerinfo
|
||||
../peerinfo,
|
||||
../errors
|
||||
|
||||
export lpstream, peerinfo
|
||||
export lpstream, peerinfo, errors
|
||||
|
||||
logScope:
|
||||
topics = "libp2p connection"
|
||||
@@ -41,14 +42,14 @@ proc isUpgraded*(s: Connection): bool =
|
||||
if not isNil(s.upgraded):
|
||||
return s.upgraded.finished
|
||||
|
||||
proc upgrade*(s: Connection, failed: ref Exception = nil) =
|
||||
proc upgradeComplete*(s: Connection) =
|
||||
if not isNil(s.upgraded):
|
||||
if not isNil(failed):
|
||||
s.upgraded.fail(failed)
|
||||
return
|
||||
|
||||
s.upgraded.complete()
|
||||
|
||||
proc upgradeFail*(s: Connection, failed: ref Exception) =
|
||||
if not isNil(s.upgraded) and not isNil(failed):
|
||||
s.upgraded.fail(failed)
|
||||
|
||||
proc onUpgrade*(s: Connection) {.async.} =
|
||||
if not isNil(s.upgraded):
|
||||
await s.upgraded
|
||||
|
||||
@@ -12,7 +12,10 @@ import stew/byteutils
|
||||
import chronicles, chronos, metrics
|
||||
import ../varint,
|
||||
../peerinfo,
|
||||
../multiaddress
|
||||
../multiaddress,
|
||||
../errors
|
||||
|
||||
export errors
|
||||
|
||||
declareGauge(libp2p_open_streams,
|
||||
"open stream instances", labels = ["type", "dir"])
|
||||
@@ -39,7 +42,7 @@ type
|
||||
dir*: Direction
|
||||
closedWithEOF: bool # prevent concurrent calls
|
||||
|
||||
LPStreamError* = object of CatchableError
|
||||
LPStreamError* = object of LPError
|
||||
LPStreamIncompleteError* = object of LPStreamError
|
||||
LPStreamIncorrectDefect* = object of Defect
|
||||
LPStreamLimitError* = object of LPStreamError
|
||||
|
||||
@@ -31,9 +31,10 @@ import stream/connection,
|
||||
utils/semaphore,
|
||||
connmanager,
|
||||
peerid,
|
||||
errors
|
||||
errors,
|
||||
dialer
|
||||
|
||||
export connmanager, upgrade
|
||||
export connmanager, upgrade, dialer
|
||||
|
||||
logScope:
|
||||
topics = "libp2p switch"
|
||||
@@ -44,26 +45,19 @@ logScope:
|
||||
# and only if the channel has been secured (i.e. if a secure manager has been
|
||||
# previously provided)
|
||||
|
||||
declareCounter(libp2p_total_dial_attempts, "total attempted dials")
|
||||
declareCounter(libp2p_successful_dials, "dialed successful peers")
|
||||
declareCounter(libp2p_failed_dials, "failed dials")
|
||||
declareCounter(libp2p_failed_upgrades_incoming, "incoming connections failed upgrades")
|
||||
declareCounter(libp2p_failed_upgrades_outgoing, "outgoing connections failed upgrades")
|
||||
|
||||
const
|
||||
ConcurrentUpgrades* = 4
|
||||
|
||||
type
|
||||
DialFailedError* = object of CatchableError
|
||||
|
||||
Switch* = ref object of RootObj
|
||||
Switch* = ref object of Dial
|
||||
peerInfo*: PeerInfo
|
||||
connManager*: ConnManager
|
||||
transports*: seq[Transport]
|
||||
ms*: MultistreamSelect
|
||||
dialLock: Table[PeerID, AsyncLock]
|
||||
acceptFuts: seq[Future[void]]
|
||||
upgrade: Upgrade
|
||||
dialer*: Dial
|
||||
|
||||
proc addConnEventHandler*(s: Switch,
|
||||
handler: ConnEventHandler,
|
||||
@@ -97,186 +91,36 @@ proc isConnected*(s: Switch, peerId: PeerID): bool =
|
||||
proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} =
|
||||
s.connManager.dropPeer(peerId)
|
||||
|
||||
proc dialAndUpgrade(s: Switch,
|
||||
peerId: PeerID,
|
||||
addrs: seq[MultiAddress]):
|
||||
Future[Connection] {.async.} =
|
||||
debug "Dialing peer", peerId
|
||||
method connect*(
|
||||
s: Switch,
|
||||
peerId: PeerID,
|
||||
addrs: seq[MultiAddress]): Future[void] =
|
||||
s.dialer.connect(peerId, addrs)
|
||||
|
||||
# Avoid "cannot be captured as it would violate memory safety" errors in Nim-1.4.x.
|
||||
var
|
||||
transport: Transport
|
||||
address: MultiAddress
|
||||
|
||||
for t in s.transports: # for each transport
|
||||
transport = t
|
||||
for a in addrs: # for each address
|
||||
address = a
|
||||
if t.handles(a): # check if it can dial it
|
||||
trace "Dialing address", address = $a, peerId
|
||||
let dialed = try:
|
||||
libp2p_total_dial_attempts.inc()
|
||||
# await a connection slot when the total
|
||||
# connection count is equal to `maxConns`
|
||||
await s.connManager.trackOutgoingConn(
|
||||
() => transport.dial(address)
|
||||
)
|
||||
except TooManyConnectionsError as exc:
|
||||
trace "Connection limit reached!"
|
||||
raise exc
|
||||
except CancelledError as exc:
|
||||
debug "Dialing canceled", msg = exc.msg, peerId
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "Dialing failed", msg = exc.msg, peerId
|
||||
libp2p_failed_dials.inc()
|
||||
continue # Try the next address
|
||||
|
||||
# make sure to assign the peer to the connection
|
||||
dialed.peerInfo = PeerInfo.init(peerId, addrs)
|
||||
|
||||
# also keep track of the connection's bottom unsafe transport direction
|
||||
# required by gossipsub scoring
|
||||
dialed.transportDir = Direction.Out
|
||||
|
||||
libp2p_successful_dials.inc()
|
||||
|
||||
let conn = try:
|
||||
await s.upgrade.upgradeOutgoing(dialed)
|
||||
except CatchableError as exc:
|
||||
# If we failed to establish the connection through one transport,
|
||||
# we won't succeeded through another - no use in trying again
|
||||
await dialed.close()
|
||||
debug "Upgrade failed", msg = exc.msg, peerId
|
||||
if exc isnot CancelledError:
|
||||
libp2p_failed_upgrades_outgoing.inc()
|
||||
raise exc
|
||||
|
||||
doAssert not isNil(conn), "connection died after upgradeOutgoing"
|
||||
debug "Dial successful", conn, peerInfo = conn.peerInfo
|
||||
return conn
|
||||
|
||||
proc internalConnect(s: Switch,
|
||||
peerId: PeerID,
|
||||
addrs: seq[MultiAddress]):
|
||||
Future[Connection] {.async.} =
|
||||
if s.peerInfo.peerId == peerId:
|
||||
raise newException(CatchableError, "can't dial self!")
|
||||
|
||||
# Ensure there's only one in-flight attempt per peer
|
||||
let lock = s.dialLock.mgetOrPut(peerId, newAsyncLock())
|
||||
try:
|
||||
await lock.acquire()
|
||||
|
||||
# Check if we have a connection already and try to reuse it
|
||||
var conn = s.connManager.selectConn(peerId)
|
||||
if conn != nil:
|
||||
if conn.atEof or conn.closed:
|
||||
# This connection should already have been removed from the connection
|
||||
# manager - it's essentially a bug that we end up here - we'll fail
|
||||
# for now, hoping that this will clean themselves up later...
|
||||
warn "dead connection in connection manager", conn
|
||||
await conn.close()
|
||||
raise newException(DialFailedError, "Zombie connection encountered")
|
||||
|
||||
trace "Reusing existing connection", conn, direction = $conn.dir
|
||||
return conn
|
||||
|
||||
conn = await s.dialAndUpgrade(peerId, addrs)
|
||||
if isNil(conn): # None of the addresses connected
|
||||
raise newException(DialFailedError, "Unable to establish outgoing link")
|
||||
|
||||
# We already check for this in Connection manager
|
||||
# but a disconnect could have happened right after
|
||||
# we've added the connection so we check again
|
||||
# to prevent races due to that.
|
||||
if conn.closed() or conn.atEof():
|
||||
# This can happen when the other ends drops us
|
||||
# before we get a chance to return the connection
|
||||
# back to the dialer.
|
||||
trace "Connection dead on arrival", conn
|
||||
raise newLPStreamClosedError()
|
||||
|
||||
return conn
|
||||
finally:
|
||||
if lock.locked():
|
||||
lock.release()
|
||||
|
||||
proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} =
|
||||
## attempt to create establish a connection
|
||||
## with a remote peer
|
||||
##
|
||||
|
||||
if s.connManager.connCount(peerId) > 0:
|
||||
return
|
||||
|
||||
discard await s.internalConnect(peerId, addrs)
|
||||
|
||||
proc negotiateStream(s: Switch, conn: Connection, protos: seq[string]): Future[Connection] {.async.} =
|
||||
trace "Negotiating stream", conn, protos
|
||||
let selected = await s.ms.select(conn, protos)
|
||||
if not protos.contains(selected):
|
||||
await conn.closeWithEOF()
|
||||
raise newException(DialFailedError, "Unable to select sub-protocol " & $protos)
|
||||
|
||||
return conn
|
||||
|
||||
proc dial*(s: Switch,
|
||||
peerId: PeerID,
|
||||
protos: seq[string]): Future[Connection] {.async.} =
|
||||
trace "Dialing (existing)", peerId, protos
|
||||
let stream = await s.connManager.getStream(peerId)
|
||||
if stream.isNil:
|
||||
raise newException(DialFailedError, "Couldn't get muxed stream")
|
||||
|
||||
return await s.negotiateStream(stream, protos)
|
||||
method dial*(
|
||||
s: Switch,
|
||||
peerId: PeerID,
|
||||
protos: seq[string]): Future[Connection] =
|
||||
s.dialer.dial(peerId, protos)
|
||||
|
||||
proc dial*(s: Switch,
|
||||
peerId: PeerID,
|
||||
proto: string): Future[Connection] =
|
||||
dial(s, peerId, @[proto])
|
||||
|
||||
proc dial*(s: Switch,
|
||||
peerId: PeerID,
|
||||
addrs: seq[MultiAddress],
|
||||
protos: seq[string]):
|
||||
Future[Connection] {.async.} =
|
||||
var
|
||||
conn: Connection
|
||||
stream: Connection
|
||||
method dial*(
|
||||
s: Switch,
|
||||
peerId: PeerID,
|
||||
addrs: seq[MultiAddress],
|
||||
protos: seq[string]): Future[Connection] =
|
||||
s.dialer.dial(peerId, addrs, protos)
|
||||
|
||||
proc cleanup() {.async.} =
|
||||
if not(isNil(stream)):
|
||||
await stream.closeWithEOF()
|
||||
|
||||
if not(isNil(conn)):
|
||||
await conn.close()
|
||||
|
||||
try:
|
||||
trace "Dialing (new)", peerId, protos
|
||||
conn = await s.internalConnect(peerId, addrs)
|
||||
trace "Opening stream", conn
|
||||
stream = await s.connManager.getStream(conn)
|
||||
|
||||
if isNil(stream):
|
||||
raise newException(DialFailedError,
|
||||
"Couldn't get muxed stream")
|
||||
|
||||
return await s.negotiateStream(stream, protos)
|
||||
except CancelledError as exc:
|
||||
trace "Dial canceled", conn
|
||||
await cleanup()
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "Error dialing", conn, msg = exc.msg
|
||||
await cleanup()
|
||||
raise exc
|
||||
|
||||
proc dial*(s: Switch,
|
||||
peerId: PeerID,
|
||||
addrs: seq[MultiAddress],
|
||||
proto: string):
|
||||
Future[Connection] = dial(s, peerId, addrs, @[proto])
|
||||
proc dial*(
|
||||
s: Switch,
|
||||
peerId: PeerID,
|
||||
addrs: seq[MultiAddress],
|
||||
proto: string): Future[Connection] =
|
||||
dial(s, peerId, addrs, @[proto])
|
||||
|
||||
proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil) {.gcsafe.} =
|
||||
if isNil(proto.handler):
|
||||
@@ -288,6 +132,7 @@ proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil) {.gcsafe
|
||||
"Protocol has to define a codec string")
|
||||
|
||||
s.ms.addHandler(proto.codecs, proto, matcher)
|
||||
s.peerInfo.protocols.add(proto.codec)
|
||||
|
||||
proc upgradeMonitor(conn: Connection, upgrades: AsyncSemaphore) {.async.} =
|
||||
## monitor connection for upgrades
|
||||
@@ -344,7 +189,7 @@ proc accept(s: Switch, transport: Transport) {.async.} = # noraises
|
||||
|
||||
debug "Accepted an incoming connection", conn
|
||||
asyncSpawn upgradeMonitor(conn, upgrades)
|
||||
asyncSpawn s.upgrade.upgradeIncoming(conn)
|
||||
asyncSpawn transport.upgradeIncoming(conn)
|
||||
except CancelledError as exc:
|
||||
trace "releasing semaphore on cancellation"
|
||||
upgrades.release() # always release the slot
|
||||
@@ -402,24 +247,17 @@ proc newSwitch*(peerInfo: PeerInfo,
|
||||
identity: Identify,
|
||||
muxers: Table[string, MuxerProvider],
|
||||
secureManagers: openarray[Secure] = [],
|
||||
maxConnections = MaxConnections,
|
||||
maxIn = -1,
|
||||
maxOut = -1,
|
||||
maxConnsPerPeer = MaxConnectionsPerPeer): Switch =
|
||||
connManager: ConnManager,
|
||||
ms: MultistreamSelect): Switch =
|
||||
if secureManagers.len == 0:
|
||||
raise (ref CatchableError)(msg: "Provide at least one secure manager")
|
||||
|
||||
let ms = newMultistream()
|
||||
let connManager = ConnManager.init(maxConnsPerPeer, maxConnections, maxIn, maxOut)
|
||||
let upgrade = MuxedUpgrade.init(identity, muxers, secureManagers, connManager, ms)
|
||||
|
||||
let switch = Switch(
|
||||
peerInfo: peerInfo,
|
||||
ms: ms,
|
||||
transports: transports,
|
||||
connManager: connManager,
|
||||
upgrade: upgrade,
|
||||
)
|
||||
dialer: Dialer.new(peerInfo, connManager, transports, ms))
|
||||
|
||||
switch.mount(identity)
|
||||
return switch
|
||||
|
||||
@@ -7,15 +7,18 @@
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import std/[oids, sequtils]
|
||||
import std/[oids, sequtils, tables]
|
||||
import chronos, chronicles
|
||||
import transport,
|
||||
../errors,
|
||||
../wire,
|
||||
../multiaddress,
|
||||
../multicodec,
|
||||
../multistream,
|
||||
../connmanager,
|
||||
../multiaddress,
|
||||
../stream/connection,
|
||||
../stream/chronosstream
|
||||
../stream/chronosstream,
|
||||
../upgrademngrs/upgrade
|
||||
|
||||
logScope:
|
||||
topics = "libp2p tcptransport"
|
||||
@@ -59,7 +62,7 @@ proc setupTcpTransportTracker(): TcpTransportTracker =
|
||||
result.isLeaked = leakTransport
|
||||
addTracker(TcpTransportTrackerName, result)
|
||||
|
||||
proc connHandler*(t: TcpTransport,
|
||||
proc connHandler*(self: TcpTransport,
|
||||
client: StreamTransport,
|
||||
dir: Direction): Future[Connection] {.async.} =
|
||||
var observedAddr: MultiAddress = MultiAddress()
|
||||
@@ -73,8 +76,8 @@ proc connHandler*(t: TcpTransport,
|
||||
|
||||
trace "Handling tcp connection", address = $observedAddr,
|
||||
dir = $dir,
|
||||
clients = t.clients[Direction.In].len +
|
||||
t.clients[Direction.Out].len
|
||||
clients = self.clients[Direction.In].len +
|
||||
self.clients[Direction.Out].len
|
||||
|
||||
let conn = Connection(
|
||||
ChronosStream.init(
|
||||
@@ -93,7 +96,7 @@ proc connHandler*(t: TcpTransport,
|
||||
trace "Cleaning up client", addrs = $client.remoteAddress,
|
||||
conn
|
||||
|
||||
t.clients[dir].keepItIf( it != client )
|
||||
self.clients[dir].keepItIf( it != client )
|
||||
await allFuturesThrowing(
|
||||
conn.close(), client.closeWait())
|
||||
|
||||
@@ -104,82 +107,108 @@ proc connHandler*(t: TcpTransport,
|
||||
let useExc {.used.} = exc
|
||||
debug "Error cleaning up client", errMsg = exc.msg, conn
|
||||
|
||||
t.clients[dir].add(client)
|
||||
self.clients[dir].add(client)
|
||||
asyncSpawn onClose()
|
||||
|
||||
return conn
|
||||
|
||||
proc init*(T: type TcpTransport,
|
||||
flags: set[ServerFlags] = {}): T =
|
||||
result = T(flags: flags)
|
||||
func init*(
|
||||
T: type TcpTransport,
|
||||
flags: set[ServerFlags] = {},
|
||||
upgrade: Upgrade): T =
|
||||
|
||||
result = T(
|
||||
flags: flags,
|
||||
upgrader: upgrade
|
||||
)
|
||||
|
||||
result.initTransport()
|
||||
|
||||
method initTransport*(t: TcpTransport) =
|
||||
t.multicodec = multiCodec("tcp")
|
||||
method initTransport*(self: TcpTransport) =
|
||||
self.multicodec = multiCodec("tcp")
|
||||
inc getTcpTransportTracker().opened
|
||||
|
||||
method start*(t: TcpTransport, ma: MultiAddress) {.async.} =
|
||||
method start*(
|
||||
self: TcpTransport,
|
||||
ma: MultiAddress) {.async.} =
|
||||
## listen on the transport
|
||||
##
|
||||
|
||||
if t.running:
|
||||
if self.running:
|
||||
trace "TCP transport already running"
|
||||
return
|
||||
|
||||
await procCall Transport(t).start(ma)
|
||||
await procCall Transport(self).start(ma)
|
||||
trace "Starting TCP transport"
|
||||
|
||||
t.server = createStreamServer(
|
||||
ma = t.ma,
|
||||
flags = t.flags,
|
||||
udata = t)
|
||||
self.server = createStreamServer(
|
||||
ma = self.ma,
|
||||
flags = self.flags,
|
||||
udata = self)
|
||||
|
||||
# always get the resolved address in case we're bound to 0.0.0.0:0
|
||||
t.ma = MultiAddress.init(t.server.sock.getLocalAddress()).tryGet()
|
||||
t.running = true
|
||||
self.ma = MultiAddress.init(self.server.sock.getLocalAddress()).tryGet()
|
||||
self.running = true
|
||||
|
||||
trace "Listening on", address = t.ma
|
||||
trace "Listening on", address = self.ma
|
||||
|
||||
method stop*(t: TcpTransport) {.async, gcsafe.} =
|
||||
method stop*(self: TcpTransport) {.async, gcsafe.} =
|
||||
## stop the transport
|
||||
##
|
||||
|
||||
t.running = false # mark stopped as soon as possible
|
||||
self.running = false # mark stopped as soon as possible
|
||||
|
||||
try:
|
||||
trace "Stopping TCP transport"
|
||||
await procCall Transport(t).stop() # call base
|
||||
await procCall Transport(self).stop() # call base
|
||||
|
||||
checkFutures(
|
||||
await allFinished(
|
||||
t.clients[Direction.In].mapIt(it.closeWait()) &
|
||||
t.clients[Direction.Out].mapIt(it.closeWait())))
|
||||
self.clients[Direction.In].mapIt(it.closeWait()) &
|
||||
self.clients[Direction.Out].mapIt(it.closeWait())))
|
||||
|
||||
# server can be nil
|
||||
if not isNil(t.server):
|
||||
await t.server.closeWait()
|
||||
if not isNil(self.server):
|
||||
await self.server.closeWait()
|
||||
|
||||
t.server = nil
|
||||
self.server = nil
|
||||
trace "Transport stopped"
|
||||
inc getTcpTransportTracker().closed
|
||||
except CatchableError as exc:
|
||||
trace "Error shutting down tcp transport", exc = exc.msg
|
||||
|
||||
method accept*(t: TcpTransport): Future[Connection] {.async, gcsafe.} =
|
||||
method upgradeIncoming*(
|
||||
self: TcpTransport,
|
||||
conn: Connection): Future[void] {.gcsafe.} =
|
||||
## base upgrade method that the transport uses to perform
|
||||
## transport specific upgrades
|
||||
##
|
||||
|
||||
self.upgrader.upgradeIncoming(conn)
|
||||
|
||||
method upgradeOutgoing*(
|
||||
self: TcpTransport,
|
||||
conn: Connection): Future[Connection] {.gcsafe.} =
|
||||
## base upgrade method that the transport uses to perform
|
||||
## transport specific upgrades
|
||||
##
|
||||
|
||||
self.upgrader.upgradeOutgoing(conn)
|
||||
|
||||
method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} =
|
||||
## accept a new TCP connection
|
||||
##
|
||||
|
||||
if not t.running:
|
||||
if not self.running:
|
||||
raise newTransportClosedError()
|
||||
|
||||
try:
|
||||
let transp = await t.server.accept()
|
||||
return await t.connHandler(transp, Direction.In)
|
||||
let transp = await self.server.accept()
|
||||
return await self.connHandler(transp, Direction.In)
|
||||
except TransportOsError as exc:
|
||||
# TODO: it doesn't sound like all OS errors
|
||||
# can be ignored, we should re-raise those
|
||||
# that can't.
|
||||
# that can'self.
|
||||
debug "OS Error", exc = exc.msg
|
||||
except TransportTooManyError as exc:
|
||||
debug "Too many files opened", exc = exc.msg
|
||||
@@ -190,19 +219,21 @@ method accept*(t: TcpTransport): Future[Connection] {.async, gcsafe.} =
|
||||
warn "Unexpected error creating connection", exc = exc.msg
|
||||
raise exc
|
||||
|
||||
method dial*(t: TcpTransport,
|
||||
address: MultiAddress):
|
||||
Future[Connection] {.async, gcsafe.} =
|
||||
method dial*(
|
||||
self: TcpTransport,
|
||||
address: MultiAddress): Future[Connection] {.async, gcsafe.} =
|
||||
## dial a peer
|
||||
##
|
||||
|
||||
trace "Dialing remote peer", address = $address
|
||||
|
||||
let transp = await connect(address)
|
||||
return await t.connHandler(transp, Direction.Out)
|
||||
return await self.connHandler(transp, Direction.Out)
|
||||
|
||||
method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} =
|
||||
if procCall Transport(t).handles(address):
|
||||
method handles*(
|
||||
self: TcpTransport,
|
||||
address: MultiAddress): bool {.gcsafe.} =
|
||||
if procCall Transport(self).handles(address):
|
||||
return address.protocols
|
||||
.tryGet()
|
||||
.filterIt( it == multiCodec("tcp") )
|
||||
|
||||
@@ -12,66 +12,84 @@ import sequtils
|
||||
import chronos, chronicles
|
||||
import ../stream/connection,
|
||||
../multiaddress,
|
||||
../multicodec
|
||||
../multicodec,
|
||||
../upgrademngrs/upgrade
|
||||
|
||||
logScope:
|
||||
topics = "libp2p transport"
|
||||
|
||||
type
|
||||
TransportClosedError* = object of CatchableError
|
||||
TransportError* = object of LPError
|
||||
TransportClosedError* = object of TransportError
|
||||
|
||||
Transport* = ref object of RootObj
|
||||
ma*: Multiaddress
|
||||
multicodec*: MultiCodec
|
||||
running*: bool
|
||||
upgrader*: Upgrade
|
||||
multicodec*: MultiCodec
|
||||
|
||||
proc newTransportClosedError*(parent: ref Exception = nil): ref CatchableError =
|
||||
newException(TransportClosedError,
|
||||
"Transport closed, no more connections!", parent)
|
||||
|
||||
method initTransport*(t: Transport) {.base, gcsafe, locks: "unknown".} =
|
||||
method initTransport*(self: Transport) {.base, gcsafe, locks: "unknown".} =
|
||||
## perform protocol initialization
|
||||
##
|
||||
|
||||
discard
|
||||
|
||||
method start*(t: Transport, ma: MultiAddress) {.base, async.} =
|
||||
method start*(
|
||||
self: Transport,
|
||||
ma: MultiAddress): Future[void] {.base, async.} =
|
||||
## start the transport
|
||||
##
|
||||
|
||||
t.ma = ma
|
||||
self.ma = ma
|
||||
trace "starting transport", address = $ma
|
||||
|
||||
method stop*(t: Transport) {.base, async.} =
|
||||
method stop*(self: Transport): Future[void] {.base, async.} =
|
||||
## stop and cleanup the transport
|
||||
## including all outstanding connections
|
||||
##
|
||||
|
||||
discard
|
||||
|
||||
method accept*(t: Transport): Future[Connection]
|
||||
{.base, async, gcsafe.} =
|
||||
method accept*(self: Transport): Future[Connection]
|
||||
{.base, gcsafe.} =
|
||||
## accept incoming connections
|
||||
##
|
||||
|
||||
discard
|
||||
|
||||
method dial*(t: Transport,
|
||||
address: MultiAddress): Future[Connection]
|
||||
{.base, async, gcsafe.} =
|
||||
method dial*(
|
||||
self: Transport,
|
||||
address: MultiAddress): Future[Connection] {.base, gcsafe.} =
|
||||
## dial a peer
|
||||
##
|
||||
|
||||
discard
|
||||
|
||||
method upgrade*(t: Transport) {.base, async, gcsafe.} =
|
||||
method upgradeIncoming*(
|
||||
self: Transport,
|
||||
conn: Connection): Future[void] {.base, gcsafe.} =
|
||||
## base upgrade method that the transport uses to perform
|
||||
## transport specific upgrades
|
||||
##
|
||||
|
||||
discard
|
||||
doAssert(false, "Not implemented!")
|
||||
|
||||
method handles*(t: Transport, address: MultiAddress): bool {.base, gcsafe.} =
|
||||
method upgradeOutgoing*(
|
||||
self: Transport,
|
||||
conn: Connection): Future[Connection] {.base, gcsafe.} =
|
||||
## base upgrade method that the transport uses to perform
|
||||
## transport specific upgrades
|
||||
##
|
||||
|
||||
doAssert(false, "Not implemented!")
|
||||
|
||||
method handles*(
|
||||
self: Transport,
|
||||
address: MultiAddress): bool {.base, gcsafe.} =
|
||||
## check if transport supports the multiaddress
|
||||
##
|
||||
|
||||
@@ -79,7 +97,7 @@ method handles*(t: Transport, address: MultiAddress): bool {.base, gcsafe.} =
|
||||
# having to repeat the check in every transport
|
||||
address.protocols.tryGet().filterIt( it == multiCodec("p2p-circuit") ).len == 0
|
||||
|
||||
method localAddress*(t: Transport): MultiAddress {.base, gcsafe.} =
|
||||
method localAddress*(self: Transport): MultiAddress {.base, gcsafe.} =
|
||||
## get the local address of the transport in case started with 0.0.0.0:0
|
||||
##
|
||||
|
||||
|
||||
@@ -15,31 +15,38 @@ import ../upgrademngrs/upgrade,
|
||||
|
||||
export Upgrade
|
||||
|
||||
logScope:
|
||||
topics = "libp2p muxedupgrade"
|
||||
|
||||
type
|
||||
MuxedUpgrade* = ref object of Upgrade
|
||||
muxers*: Table[string, MuxerProvider]
|
||||
streamHandler*: StreamHandler
|
||||
|
||||
proc identify*(u: MuxedUpgrade, muxer: Muxer) {.async, gcsafe.} =
|
||||
proc identify*(
|
||||
self: MuxedUpgrade,
|
||||
muxer: Muxer) {.async, gcsafe.} =
|
||||
# new stream for identify
|
||||
var stream = await muxer.newStream()
|
||||
if stream == nil:
|
||||
return
|
||||
|
||||
try:
|
||||
await u.identify(stream)
|
||||
await self.identify(stream)
|
||||
finally:
|
||||
await stream.closeWithEOF()
|
||||
|
||||
proc mux*(u: MuxedUpgrade, conn: Connection): Future[Muxer] {.async, gcsafe.} =
|
||||
proc mux*(
|
||||
self: MuxedUpgrade,
|
||||
conn: Connection): Future[Muxer] {.async, gcsafe.} =
|
||||
## mux incoming connection
|
||||
|
||||
trace "Muxing connection", conn
|
||||
if u.muxers.len == 0:
|
||||
if self.muxers.len == 0:
|
||||
warn "no muxers registered, skipping upgrade flow", conn
|
||||
return
|
||||
|
||||
let muxerName = await u.ms.select(conn, toSeq(u.muxers.keys()))
|
||||
let muxerName = await self.ms.select(conn, toSeq(self.muxers.keys()))
|
||||
if muxerName.len == 0 or muxerName == "na":
|
||||
debug "no muxer available, early exit", conn
|
||||
return
|
||||
@@ -47,18 +54,18 @@ proc mux*(u: MuxedUpgrade, conn: Connection): Future[Muxer] {.async, gcsafe.} =
|
||||
trace "Found a muxer", conn, muxerName
|
||||
|
||||
# create new muxer for connection
|
||||
let muxer = u.muxers[muxerName].newMuxer(conn)
|
||||
let muxer = self.muxers[muxerName].newMuxer(conn)
|
||||
|
||||
# install stream handler
|
||||
muxer.streamHandler = u.streamHandler
|
||||
muxer.streamHandler = self.streamHandler
|
||||
|
||||
u.connManager.storeConn(conn)
|
||||
self.connManager.storeConn(conn)
|
||||
|
||||
# store it in muxed connections if we have a peer for it
|
||||
u.connManager.storeMuxer(muxer, muxer.handle()) # store muxer and start read loop
|
||||
self.connManager.storeMuxer(muxer, muxer.handle()) # store muxer and start read loop
|
||||
|
||||
try:
|
||||
await u.identify(muxer)
|
||||
await self.identify(muxer)
|
||||
except CatchableError as exc:
|
||||
# Identify is non-essential, though if it fails, it might indicate that
|
||||
# the connection was closed already - this will be picked up by the read
|
||||
@@ -67,10 +74,12 @@ proc mux*(u: MuxedUpgrade, conn: Connection): Future[Muxer] {.async, gcsafe.} =
|
||||
|
||||
return muxer
|
||||
|
||||
method upgradeOutgoing*(u: MuxedUpgrade, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
||||
method upgradeOutgoing*(
|
||||
self: MuxedUpgrade,
|
||||
conn: Connection): Future[Connection] {.async, gcsafe.} =
|
||||
trace "Upgrading outgoing connection", conn
|
||||
|
||||
let sconn = await u.secure(conn) # secure the connection
|
||||
let sconn = await self.secure(conn) # secure the connection
|
||||
if isNil(sconn):
|
||||
raise newException(UpgradeFailedError,
|
||||
"unable to secure connection, stopping upgrade")
|
||||
@@ -79,7 +88,7 @@ method upgradeOutgoing*(u: MuxedUpgrade, conn: Connection): Future[Connection] {
|
||||
raise newException(UpgradeFailedError,
|
||||
"current version of nim-libp2p requires that secure protocol negotiates peerid")
|
||||
|
||||
let muxer = await u.mux(sconn) # mux it if possible
|
||||
let muxer = await self.mux(sconn) # mux it if possible
|
||||
if muxer == nil:
|
||||
# TODO this might be relaxed in the future
|
||||
raise newException(UpgradeFailedError,
|
||||
@@ -94,45 +103,67 @@ method upgradeOutgoing*(u: MuxedUpgrade, conn: Connection): Future[Connection] {
|
||||
|
||||
return sconn
|
||||
|
||||
method upgradeIncoming*(u: MuxedUpgrade, incomingConn: Connection) {.async, gcsafe.} = # noraises
|
||||
proc securedMuxableConnection*(
|
||||
self: MuxedUpgrade,
|
||||
conn: Connection,
|
||||
proto: string,
|
||||
ms: MultistreamSelect) {.async, gcsafe.} =
|
||||
## Secure and handle a muxable incoming
|
||||
## connection.
|
||||
##
|
||||
## This means that the incoming
|
||||
## connection should be "securable" and
|
||||
## "muxable".
|
||||
##
|
||||
## This is also closely related
|
||||
## to transports, some could potentially
|
||||
## not require neither securing nor muxing,
|
||||
## some might require either one of this.
|
||||
##
|
||||
|
||||
trace "Starting secure handler", conn
|
||||
let secure = self.secureManagers.filterIt(it.codec == proto)[0]
|
||||
var cconn = conn
|
||||
try:
|
||||
var sconn = await secure.secure(cconn, false)
|
||||
if isNil(sconn):
|
||||
return
|
||||
|
||||
cconn = sconn
|
||||
# add the muxer
|
||||
for muxer in self.muxers.values:
|
||||
ms.addHandler(muxer.codecs, muxer)
|
||||
|
||||
# handle subsequent secure requests
|
||||
await ms.handle(cconn)
|
||||
except CatchableError as exc:
|
||||
debug "Exception in secure handler during incoming upgrade", msg = exc.msg, conn
|
||||
if not cconn.isUpgraded:
|
||||
cconn.upgradeFail(exc)
|
||||
finally:
|
||||
if not isNil(cconn):
|
||||
await cconn.close()
|
||||
|
||||
trace "Stopped secure handler", conn
|
||||
|
||||
method upgradeIncoming*(
|
||||
self: MuxedUpgrade,
|
||||
incomingConn: Connection): Future[void] {.async, gcsafe.} = # noraises
|
||||
## This is the upgrade flow to handle muxed
|
||||
## connections
|
||||
##
|
||||
trace "Upgrading incoming connection", incomingConn
|
||||
let ms = newMultistream()
|
||||
|
||||
# secure incoming connections
|
||||
proc securedHandler(conn: Connection,
|
||||
proto: string)
|
||||
{.async, gcsafe, closure.} =
|
||||
trace "Starting secure handler", conn
|
||||
let secure = u.secureManagers.filterIt(it.codec == proto)[0]
|
||||
|
||||
var cconn = conn
|
||||
try:
|
||||
var sconn = await secure.secure(cconn, false)
|
||||
if isNil(sconn):
|
||||
return
|
||||
|
||||
cconn = sconn
|
||||
# add the muxer
|
||||
for muxer in u.muxers.values:
|
||||
ms.addHandler(muxer.codecs, muxer)
|
||||
|
||||
# handle subsequent secure requests
|
||||
await ms.handle(cconn)
|
||||
except CatchableError as exc:
|
||||
debug "Exception in secure handler during incoming upgrade", msg = exc.msg, conn
|
||||
if not cconn.isUpgraded:
|
||||
cconn.upgrade(exc)
|
||||
finally:
|
||||
if not isNil(cconn):
|
||||
await cconn.close()
|
||||
|
||||
trace "Stopped secure handler", conn
|
||||
|
||||
try:
|
||||
if (await ms.select(incomingConn)): # just handshake
|
||||
# add the secure handlers
|
||||
for k in u.secureManagers:
|
||||
ms.addHandler(k.codec, securedHandler)
|
||||
for k in self.secureManagers:
|
||||
ms.addHandler(
|
||||
k.codec,
|
||||
proc(conn: Connection, proto: string): Future[void] =
|
||||
self.securedMuxableConnection(conn, proto, ms)
|
||||
)
|
||||
|
||||
# handle un-secured connections
|
||||
# we handshaked above, set this ms handler as active
|
||||
@@ -140,12 +171,14 @@ method upgradeIncoming*(u: MuxedUpgrade, incomingConn: Connection) {.async, gcsa
|
||||
except CatchableError as exc:
|
||||
debug "Exception upgrading incoming", exc = exc.msg
|
||||
if not incomingConn.isUpgraded:
|
||||
incomingConn.upgrade(exc)
|
||||
incomingConn.upgradeFail(exc)
|
||||
finally:
|
||||
if not isNil(incomingConn):
|
||||
await incomingConn.close()
|
||||
|
||||
proc muxerHandler(u: MuxedUpgrade, muxer: Muxer) {.async, gcsafe.} =
|
||||
proc muxerHandler(
|
||||
self: MuxedUpgrade,
|
||||
muxer: Muxer) {.async, gcsafe.} =
|
||||
let
|
||||
conn = muxer.connection
|
||||
|
||||
@@ -155,13 +188,13 @@ proc muxerHandler(u: MuxedUpgrade, muxer: Muxer) {.async, gcsafe.} =
|
||||
return
|
||||
|
||||
# store incoming connection
|
||||
u.connManager.storeConn(conn)
|
||||
self.connManager.storeConn(conn)
|
||||
|
||||
# store muxer and muxed connection
|
||||
u.connManager.storeMuxer(muxer)
|
||||
self.connManager.storeMuxer(muxer)
|
||||
|
||||
try:
|
||||
await u.identify(muxer)
|
||||
await self.identify(muxer)
|
||||
except IdentifyError as exc:
|
||||
# Identify is non-essential, though if it fails, it might indicate that
|
||||
# the connection was closed already - this will be picked up by the read
|
||||
@@ -193,17 +226,20 @@ proc init*(
|
||||
connManager: connManager,
|
||||
ms: ms)
|
||||
|
||||
upgrader.streamHandler = proc(conn: Connection) {.async, gcsafe.} = # noraises
|
||||
trace "Starting stream handler", conn
|
||||
try:
|
||||
await upgrader.ms.handle(conn) # handle incoming connection
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception in stream handler", conn, msg = exc.msg
|
||||
finally:
|
||||
await conn.closeWithEOF()
|
||||
trace "Stream handler done", conn
|
||||
proc streamHandler(conn: Connection) {.async, gcsafe.} = # noraises
|
||||
# trace "Starting stream handler", conn
|
||||
try:
|
||||
await upgrader.ms.handle(conn) # handle incoming connection
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
# trace "exception in stream handler", conn, msg = exc.msg
|
||||
discard
|
||||
finally:
|
||||
await conn.closeWithEOF()
|
||||
# trace "Stream handler done", conn
|
||||
|
||||
upgrader.streamHandler = streamHandler
|
||||
|
||||
for _, val in muxers:
|
||||
val.streamHandler = upgrader.streamHandler
|
||||
|
||||
@@ -20,8 +20,11 @@ export connmanager, connection, identify, secure, multistream
|
||||
|
||||
declarePublicCounter(libp2p_failed_upgrade, "peers failed upgrade")
|
||||
|
||||
logScope:
|
||||
topics = "libp2p upgrade"
|
||||
|
||||
type
|
||||
UpgradeFailedError* = object of CatchableError
|
||||
UpgradeFailedError* = object of LPError
|
||||
|
||||
Upgrade* = ref object of RootObj
|
||||
ms*: MultistreamSelect
|
||||
@@ -29,22 +32,28 @@ type
|
||||
connManager*: ConnManager
|
||||
secureManagers*: seq[Secure]
|
||||
|
||||
method upgradeIncoming*(u: Upgrade, conn: Connection): Future[void] {.base.} =
|
||||
method upgradeIncoming*(
|
||||
self: Upgrade,
|
||||
conn: Connection): Future[void] {.base.} =
|
||||
doAssert(false, "Not implemented!")
|
||||
|
||||
method upgradeOutgoing*(u: Upgrade, conn: Connection): Future[Connection] {.base.} =
|
||||
method upgradeOutgoing*(
|
||||
self: Upgrade,
|
||||
conn: Connection): Future[Connection] {.base.} =
|
||||
doAssert(false, "Not implemented!")
|
||||
|
||||
proc secure*(u: Upgrade, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
||||
if u.secureManagers.len <= 0:
|
||||
proc secure*(
|
||||
self: Upgrade,
|
||||
conn: Connection): Future[Connection] {.async, gcsafe.} =
|
||||
if self.secureManagers.len <= 0:
|
||||
raise newException(UpgradeFailedError, "No secure managers registered!")
|
||||
|
||||
let codec = await u.ms.select(conn, u.secureManagers.mapIt(it.codec))
|
||||
let codec = await self.ms.select(conn, self.secureManagers.mapIt(it.codec))
|
||||
if codec.len == 0:
|
||||
raise newException(UpgradeFailedError, "Unable to negotiate a secure channel!")
|
||||
|
||||
trace "Securing connection", conn, codec
|
||||
let secureProtocol = u.secureManagers.filterIt(it.codec == codec)
|
||||
let secureProtocol = self.secureManagers.filterIt(it.codec == codec)
|
||||
|
||||
# ms.select should deal with the correctness of this
|
||||
# let's avoid duplicating checks but detect if it fails to do it properly
|
||||
@@ -52,11 +61,13 @@ proc secure*(u: Upgrade, conn: Connection): Future[Connection] {.async, gcsafe.}
|
||||
|
||||
return await secureProtocol[0].secure(conn, true)
|
||||
|
||||
proc identify*(u: Upgrade, conn: Connection) {.async, gcsafe.} =
|
||||
proc identify*(
|
||||
self: Upgrade,
|
||||
conn: Connection) {.async, gcsafe.} =
|
||||
## identify the connection
|
||||
|
||||
if (await u.ms.select(conn, u.identity.codec)):
|
||||
let info = await u.identity.identify(conn, conn.peerInfo)
|
||||
if (await self.ms.select(conn, self.identity.codec)):
|
||||
let info = await self.identity.identify(conn, conn.peerInfo)
|
||||
|
||||
if info.pubKey.isNone and isNil(conn):
|
||||
raise newException(UpgradeFailedError,
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
--path:".."
|
||||
--threads:on
|
||||
@@ -9,7 +9,7 @@
|
||||
|
||||
{.used.}
|
||||
|
||||
import unittest, sequtils, options, tables
|
||||
import unittest, sequtils, options, tables, sets
|
||||
import chronos, stew/byteutils
|
||||
import utils,
|
||||
../../libp2p/[errors,
|
||||
@@ -363,7 +363,18 @@ suite "FloodSub":
|
||||
pubs &= nodes[i].publish("foobar", ("Hello!" & $i).toBytes())
|
||||
await allFuturesThrowing(pubs)
|
||||
|
||||
# wait the test task
|
||||
await allFuturesThrowing(futs.mapIt(it[0]))
|
||||
|
||||
# test calling unsubscribeAll for coverage
|
||||
for node in nodes:
|
||||
node.unsubscribeAll("foobar")
|
||||
check:
|
||||
# we keep the peers in table
|
||||
FloodSub(node).floodsub["foobar"].len == 9
|
||||
# remove the topic tho
|
||||
node.topics.len == 0
|
||||
|
||||
await allFuturesThrowing(
|
||||
nodes.mapIt(
|
||||
allFutures(
|
||||
|
||||
@@ -8,7 +8,8 @@ import ../libp2p/[protocols/identify,
|
||||
multistream,
|
||||
transports/transport,
|
||||
transports/tcptransport,
|
||||
crypto/crypto]
|
||||
crypto/crypto,
|
||||
upgrademngrs/upgrade]
|
||||
import ./helpers
|
||||
|
||||
when defined(nimHasUsed): {.used.}
|
||||
@@ -38,8 +39,8 @@ suite "Identify":
|
||||
remotePeerInfo = PeerInfo.init(
|
||||
remoteSecKey, [ma], ["/test/proto1/1.0.0", "/test/proto2/1.0.0"])
|
||||
|
||||
transport1 = TcpTransport.init()
|
||||
transport2 = TcpTransport.init()
|
||||
transport1 = TcpTransport.init(upgrade = Upgrade())
|
||||
transport2 = TcpTransport.init(upgrade = Upgrade())
|
||||
|
||||
identifyProto1 = newIdentify(remotePeerInfo)
|
||||
identifyProto2 = newIdentify(remotePeerInfo)
|
||||
|
||||
@@ -9,6 +9,7 @@ import ../libp2p/[errors,
|
||||
muxers/mplex/mplex,
|
||||
muxers/mplex/coder,
|
||||
muxers/mplex/lpchannel,
|
||||
upgrademngrs/upgrade,
|
||||
vbuffer,
|
||||
varint]
|
||||
|
||||
@@ -111,7 +112,7 @@ suite "Mplex":
|
||||
|
||||
var data = newSeq[byte](6)
|
||||
await chann.close() # closing channel
|
||||
# should be able to read on local clsoe
|
||||
# should be able to read on local close
|
||||
await chann.readExactly(addr data[0], 3)
|
||||
# closing remote end
|
||||
let closeFut = chann.pushEof()
|
||||
@@ -379,7 +380,7 @@ suite "Mplex":
|
||||
asyncTest "read/write receiver":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let listenFut = transport1.start(ma)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
@@ -395,7 +396,7 @@ suite "Mplex":
|
||||
await mplexListen.close()
|
||||
|
||||
let acceptFut = acceptHandler()
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
let mplexDial = Mplex.init(conn)
|
||||
@@ -416,7 +417,7 @@ suite "Mplex":
|
||||
asyncTest "read/write receiver lazy":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let listenFut = transport1.start(ma)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
@@ -432,7 +433,7 @@ suite "Mplex":
|
||||
await mplexListen.close()
|
||||
|
||||
let acceptFut = acceptHandler()
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
let mplexDial = Mplex.init(conn)
|
||||
@@ -460,7 +461,7 @@ suite "Mplex":
|
||||
for _ in 0..<MaxMsgSize:
|
||||
bigseq.add(uint8(rand(uint('A')..uint('z'))))
|
||||
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let listenFut = transport1.start(ma)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
@@ -484,7 +485,7 @@ suite "Mplex":
|
||||
check false
|
||||
|
||||
let acceptFut = acceptHandler()
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
let mplexDial = Mplex.init(conn)
|
||||
@@ -507,7 +508,7 @@ suite "Mplex":
|
||||
asyncTest "read/write initiator":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let listenFut = transport1.start(ma)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
@@ -521,7 +522,7 @@ suite "Mplex":
|
||||
await mplexListen.handle()
|
||||
await mplexListen.close()
|
||||
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
let acceptFut = acceptHandler()
|
||||
@@ -543,7 +544,7 @@ suite "Mplex":
|
||||
asyncTest "multiple streams":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
|
||||
let transport1 = TcpTransport.init()
|
||||
let transport1 = TcpTransport.init(upgrade = Upgrade())
|
||||
let listenFut = transport1.start(ma)
|
||||
|
||||
let done = newFuture[void]()
|
||||
@@ -563,7 +564,7 @@ suite "Mplex":
|
||||
await mplexListen.handle()
|
||||
await mplexListen.close()
|
||||
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
let acceptFut = acceptHandler()
|
||||
@@ -587,7 +588,7 @@ suite "Mplex":
|
||||
asyncTest "multiple read/write streams":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let listenFut = transport1.start(ma)
|
||||
|
||||
let done = newFuture[void]()
|
||||
@@ -608,7 +609,7 @@ suite "Mplex":
|
||||
await mplexListen.handle()
|
||||
await mplexListen.close()
|
||||
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
let acceptFut = acceptHandler()
|
||||
@@ -634,7 +635,7 @@ suite "Mplex":
|
||||
asyncTest "channel closes listener with EOF":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
|
||||
let transport1 = TcpTransport.init()
|
||||
let transport1 = TcpTransport.init(upgrade = Upgrade())
|
||||
var listenStreams: seq[Connection]
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
let conn = await transport1.accept()
|
||||
@@ -656,7 +657,7 @@ suite "Mplex":
|
||||
|
||||
await transport1.start(ma)
|
||||
let acceptFut = acceptHandler()
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
let mplexDial = Mplex.init(conn)
|
||||
@@ -681,7 +682,7 @@ suite "Mplex":
|
||||
|
||||
asyncTest "channel closes dialer with EOF":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
let transport1 = TcpTransport.init()
|
||||
let transport1 = TcpTransport.init(upgrade = Upgrade())
|
||||
|
||||
var count = 0
|
||||
var done = newFuture[void]()
|
||||
@@ -704,7 +705,7 @@ suite "Mplex":
|
||||
await transport1.start(ma)
|
||||
let acceptFut = acceptHandler()
|
||||
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
let mplexDial = Mplex.init(conn)
|
||||
@@ -746,7 +747,7 @@ suite "Mplex":
|
||||
|
||||
asyncTest "dialing mplex closes both ends":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
let transport1 = TcpTransport.init()
|
||||
let transport1 = TcpTransport.init(upgrade = Upgrade())
|
||||
|
||||
var listenStreams: seq[Connection]
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
@@ -763,7 +764,7 @@ suite "Mplex":
|
||||
await transport1.start(ma)
|
||||
let acceptFut = acceptHandler()
|
||||
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
let mplexDial = Mplex.init(conn)
|
||||
@@ -788,7 +789,7 @@ suite "Mplex":
|
||||
|
||||
asyncTest "listening mplex closes both ends":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
let transport1 = TcpTransport.init()
|
||||
let transport1 = TcpTransport.init(upgrade = Upgrade())
|
||||
|
||||
var mplexListen: Mplex
|
||||
var listenStreams: seq[Connection]
|
||||
@@ -806,7 +807,7 @@ suite "Mplex":
|
||||
await transport1.start(ma)
|
||||
let acceptFut = acceptHandler()
|
||||
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
let mplexDial = Mplex.init(conn)
|
||||
@@ -831,7 +832,7 @@ suite "Mplex":
|
||||
|
||||
asyncTest "canceling mplex handler closes both ends":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
let transport1 = TcpTransport.init()
|
||||
let transport1 = TcpTransport.init(upgrade = Upgrade())
|
||||
|
||||
var mplexHandle: Future[void]
|
||||
var listenStreams: seq[Connection]
|
||||
@@ -850,7 +851,7 @@ suite "Mplex":
|
||||
await transport1.start(ma)
|
||||
let acceptFut = acceptHandler()
|
||||
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
let mplexDial = Mplex.init(conn)
|
||||
@@ -874,7 +875,7 @@ suite "Mplex":
|
||||
|
||||
asyncTest "closing dialing connection should close both ends":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
let transport1 = TcpTransport.init()
|
||||
let transport1 = TcpTransport.init(upgrade = Upgrade())
|
||||
|
||||
var listenStreams: seq[Connection]
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
@@ -891,7 +892,7 @@ suite "Mplex":
|
||||
await transport1.start(ma)
|
||||
let acceptFut = acceptHandler()
|
||||
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
let mplexDial = Mplex.init(conn)
|
||||
@@ -916,7 +917,7 @@ suite "Mplex":
|
||||
|
||||
asyncTest "canceling listening connection should close both ends":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
let transport1 = TcpTransport.init()
|
||||
let transport1 = TcpTransport.init(upgrade = Upgrade())
|
||||
|
||||
var listenConn: Connection
|
||||
var listenStreams: seq[Connection]
|
||||
@@ -934,7 +935,7 @@ suite "Mplex":
|
||||
await transport1.start(ma)
|
||||
let acceptFut = acceptHandler()
|
||||
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
let mplexDial = Mplex.init(conn)
|
||||
@@ -962,7 +963,7 @@ suite "Mplex":
|
||||
asyncTest "channel should be able to handle erratic read/writes":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let listenFut = transport1.start(ma)
|
||||
|
||||
var complete = newFuture[void]()
|
||||
@@ -983,7 +984,7 @@ suite "Mplex":
|
||||
await mplexListen.handle()
|
||||
await mplexListen.close()
|
||||
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
let acceptFut = acceptHandler()
|
||||
@@ -1034,7 +1035,7 @@ suite "Mplex":
|
||||
asyncTest "channel should handle 1 byte read/write":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let listenFut = transport1.start(ma)
|
||||
|
||||
var complete = newFuture[void]()
|
||||
@@ -1052,7 +1053,7 @@ suite "Mplex":
|
||||
await mplexListen.handle()
|
||||
await mplexListen.close()
|
||||
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
let acceptFut = acceptHandler()
|
||||
|
||||
@@ -7,7 +7,8 @@ import ../libp2p/errors,
|
||||
../libp2p/multiaddress,
|
||||
../libp2p/transports/transport,
|
||||
../libp2p/transports/tcptransport,
|
||||
../libp2p/protocols/protocol
|
||||
../libp2p/protocols/protocol,
|
||||
../libp2p/upgrademngrs/upgrade
|
||||
|
||||
import ./helpers
|
||||
|
||||
@@ -247,7 +248,7 @@ suite "Multistream select":
|
||||
let msListen = newMultistream()
|
||||
msListen.addHandler("/test/proto/1.0.0", protocol)
|
||||
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1 = TcpTransport.init(upgrade = Upgrade())
|
||||
asyncCheck transport1.start(ma)
|
||||
|
||||
proc acceptHandler(): Future[void] {.async, gcsafe.} =
|
||||
@@ -258,7 +259,7 @@ suite "Multistream select":
|
||||
let handlerWait = acceptHandler()
|
||||
|
||||
let msDial = newMultistream()
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2 = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
check (await msDial.select(conn, "/test/proto/1.0.0")) == true
|
||||
@@ -294,7 +295,7 @@ suite "Multistream select":
|
||||
msListen.addHandler("/test/proto1/1.0.0", protocol)
|
||||
msListen.addHandler("/test/proto2/1.0.0", protocol)
|
||||
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let listenFut = transport1.start(ma)
|
||||
|
||||
proc acceptHandler(): Future[void] {.async, gcsafe.} =
|
||||
@@ -310,7 +311,7 @@ suite "Multistream select":
|
||||
|
||||
let acceptFut = acceptHandler()
|
||||
let msDial = newMultistream()
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
let ls = await msDial.list(conn)
|
||||
@@ -339,7 +340,7 @@ suite "Multistream select":
|
||||
let msListen = newMultistream()
|
||||
msListen.addHandler("/test/proto/1.0.0", protocol)
|
||||
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
asyncCheck transport1.start(ma)
|
||||
|
||||
proc acceptHandler(): Future[void] {.async, gcsafe.} =
|
||||
@@ -348,7 +349,7 @@ suite "Multistream select":
|
||||
|
||||
let acceptFut = acceptHandler()
|
||||
let msDial = newMultistream()
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
check (await msDial.select(conn,
|
||||
@@ -377,7 +378,7 @@ suite "Multistream select":
|
||||
msListen.addHandler("/test/proto1/1.0.0", protocol)
|
||||
msListen.addHandler("/test/proto2/1.0.0", protocol)
|
||||
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
asyncCheck transport1.start(ma)
|
||||
|
||||
proc acceptHandler(): Future[void] {.async, gcsafe.} =
|
||||
@@ -386,7 +387,7 @@ suite "Multistream select":
|
||||
|
||||
let acceptFut = acceptHandler()
|
||||
let msDial = newMultistream()
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
check (await msDial.select(conn,
|
||||
|
||||
@@ -29,7 +29,9 @@ import ../libp2p/[switch,
|
||||
muxers/mplex/mplex,
|
||||
protocols/secure/noise,
|
||||
protocols/secure/secio,
|
||||
protocols/secure/secure]
|
||||
protocols/secure/secure,
|
||||
upgrademngrs/muxedupgrade,
|
||||
connmanager]
|
||||
import ./helpers
|
||||
|
||||
const
|
||||
@@ -51,23 +53,31 @@ method init(p: TestProto) {.gcsafe.} =
|
||||
proc createSwitch(ma: MultiAddress; outgoing: bool, secio: bool = false): (Switch, PeerInfo) =
|
||||
var peerInfo: PeerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
|
||||
peerInfo.addrs.add(ma)
|
||||
let identify = newIdentify(peerInfo)
|
||||
|
||||
proc createMplex(conn: Connection): Muxer =
|
||||
result = Mplex.init(conn)
|
||||
|
||||
let mplexProvider = newMuxerProvider(createMplex, MplexCodec)
|
||||
let transports = @[Transport(TcpTransport.init())]
|
||||
let muxers = [(MplexCodec, mplexProvider)].toTable()
|
||||
let secureManagers = if secio:
|
||||
let
|
||||
identify = newIdentify(peerInfo)
|
||||
mplexProvider = newMuxerProvider(createMplex, MplexCodec)
|
||||
muxers = [(MplexCodec, mplexProvider)].toTable()
|
||||
secureManagers = if secio:
|
||||
[Secure(newSecio(rng, peerInfo.privateKey))]
|
||||
else:
|
||||
[Secure(newNoise(rng, peerInfo.privateKey, outgoing = outgoing))]
|
||||
let switch = newSwitch(peerInfo,
|
||||
transports,
|
||||
identify,
|
||||
muxers,
|
||||
secureManagers)
|
||||
connManager = ConnManager.init()
|
||||
ms = newMultistream()
|
||||
muxedUpgrade = MuxedUpgrade.init(identify, muxers, secureManagers, connManager, ms)
|
||||
transports = @[Transport(TcpTransport.init(upgrade = muxedUpgrade))]
|
||||
|
||||
let switch = newSwitch(
|
||||
peerInfo,
|
||||
transports,
|
||||
identify,
|
||||
muxers,
|
||||
secureManagers,
|
||||
connManager,
|
||||
ms)
|
||||
result = (switch, peerInfo)
|
||||
|
||||
suite "Noise":
|
||||
@@ -80,7 +90,7 @@ suite "Noise":
|
||||
serverInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [server])
|
||||
serverNoise = newNoise(rng, serverInfo.privateKey, outgoing = false)
|
||||
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
asyncCheck transport1.start(server)
|
||||
|
||||
proc acceptHandler() {.async.} =
|
||||
@@ -94,7 +104,7 @@ suite "Noise":
|
||||
|
||||
let
|
||||
acceptFut = acceptHandler()
|
||||
transport2: TcpTransport = TcpTransport.init()
|
||||
transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma])
|
||||
clientNoise = newNoise(rng, clientInfo.privateKey, outgoing = true)
|
||||
conn = await transport2.dial(transport1.ma)
|
||||
@@ -118,7 +128,7 @@ suite "Noise":
|
||||
serverNoise = newNoise(rng, serverInfo.privateKey, outgoing = false)
|
||||
|
||||
let
|
||||
transport1: TcpTransport = TcpTransport.init()
|
||||
transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
|
||||
asyncCheck transport1.start(server)
|
||||
|
||||
@@ -134,7 +144,7 @@ suite "Noise":
|
||||
|
||||
let
|
||||
handlerWait = acceptHandler()
|
||||
transport2: TcpTransport = TcpTransport.init()
|
||||
transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma])
|
||||
clientNoise = newNoise(rng, clientInfo.privateKey, outgoing = true, commonPrologue = @[1'u8, 2'u8, 3'u8])
|
||||
conn = await transport2.dial(transport1.ma)
|
||||
@@ -154,7 +164,7 @@ suite "Noise":
|
||||
serverNoise = newNoise(rng, serverInfo.privateKey, outgoing = false)
|
||||
readTask = newFuture[void]()
|
||||
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
asyncCheck transport1.start(server)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
@@ -170,7 +180,7 @@ suite "Noise":
|
||||
|
||||
let
|
||||
acceptFut = acceptHandler()
|
||||
transport2: TcpTransport = TcpTransport.init()
|
||||
transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma])
|
||||
clientNoise = newNoise(rng, clientInfo.privateKey, outgoing = true)
|
||||
conn = await transport2.dial(transport1.ma)
|
||||
@@ -195,7 +205,7 @@ suite "Noise":
|
||||
trace "Sending huge payload", size = hugePayload.len
|
||||
|
||||
let
|
||||
transport1: TcpTransport = TcpTransport.init()
|
||||
transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
listenFut = transport1.start(server)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
@@ -209,7 +219,7 @@ suite "Noise":
|
||||
|
||||
let
|
||||
acceptFut = acceptHandler()
|
||||
transport2: TcpTransport = TcpTransport.init()
|
||||
transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma])
|
||||
clientNoise = newNoise(rng, clientInfo.privateKey, outgoing = true)
|
||||
conn = await transport2.dial(transport1.ma)
|
||||
@@ -278,89 +288,3 @@ suite "Noise":
|
||||
switch2.stop())
|
||||
|
||||
await allFuturesThrowing(awaiters)
|
||||
|
||||
# test "interop with rust noise":
|
||||
# when true: # disable cos in CI we got no interop server/client
|
||||
# proc testListenerDialer(): Future[bool] {.async.} =
|
||||
# const
|
||||
# proto = "/noise/xx/25519/chachapoly/sha256/0.1.0"
|
||||
|
||||
# let
|
||||
# local = Multiaddress.init("/ip4/0.0.0.0/tcp/23456")
|
||||
# info = PeerInfo.init(PrivateKey.random(ECDSA), [local])
|
||||
# noise = newNoise(info.privateKey)
|
||||
# ms = newMultistream()
|
||||
# transport = TcpTransport.newTransport()
|
||||
|
||||
# proc connHandler(conn: Connection) {.async, gcsafe.} =
|
||||
# try:
|
||||
# await ms.handle(conn)
|
||||
# trace "ms.handle exited"
|
||||
# except:
|
||||
# error getCurrentExceptionMsg()
|
||||
# finally:
|
||||
# await conn.close()
|
||||
|
||||
# ms.addHandler(proto, noise)
|
||||
|
||||
# let
|
||||
# clientConn = await transport.listen(local, connHandler)
|
||||
# await clientConn
|
||||
|
||||
# result = true
|
||||
|
||||
# check:
|
||||
# waitFor(testListenerDialer()) == true
|
||||
|
||||
# test "interop with rust noise":
|
||||
# when true: # disable cos in CI we got no interop server/client
|
||||
# proc testListenerDialer(): Future[bool] {.async.} =
|
||||
# const
|
||||
# proto = "/noise/xx/25519/chachapoly/sha256/0.1.0"
|
||||
|
||||
# let
|
||||
# local = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
|
||||
# remote = Multiaddress.init("/ip4/127.0.0.1/tcp/23456")
|
||||
# info = PeerInfo.init(PrivateKey.random(ECDSA), [local])
|
||||
# noise = newNoise(info.privateKey)
|
||||
# ms = newMultistream()
|
||||
# transport = TcpTransport.newTransport()
|
||||
# conn = await transport.dial(remote)
|
||||
|
||||
# check ms.select(conn, @[proto]).await == proto
|
||||
|
||||
# let
|
||||
# sconn = await noise.secure(conn, true)
|
||||
|
||||
# # use sconn
|
||||
|
||||
# result = true
|
||||
|
||||
# check:
|
||||
# waitFor(testListenerDialer()) == true
|
||||
|
||||
# test "interop with go noise":
|
||||
# when true: # disable cos in CI we got no interop server/client
|
||||
# proc testListenerDialer(): Future[bool] {.async.} =
|
||||
# let
|
||||
# local = Multiaddress.init("/ip4/0.0.0.0/tcp/23456")
|
||||
# info = PeerInfo.init(PrivateKey.random(ECDSA), [local])
|
||||
# noise = newNoise(info.privateKey)
|
||||
# ms = newMultistream()
|
||||
# transport = TcpTransport.newTransport()
|
||||
|
||||
# proc connHandler(conn: Connection) {.async, gcsafe.} =
|
||||
# try:
|
||||
# let seconn = await noise.secure(conn, false)
|
||||
# trace "ms.handle exited"
|
||||
# finally:
|
||||
# await conn.close()
|
||||
|
||||
# let
|
||||
# clientConn = await transport.listen(local, connHandler)
|
||||
# await clientConn
|
||||
|
||||
# result = true
|
||||
|
||||
# check:
|
||||
# waitFor(testListenerDialer()) == true
|
||||
|
||||
@@ -47,10 +47,10 @@ suite "Switch":
|
||||
testProto.codec = TestCodec
|
||||
testProto.handler = handle
|
||||
|
||||
let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
|
||||
let switch1 = newStandardSwitch()
|
||||
switch1.mount(testProto)
|
||||
|
||||
let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
|
||||
let switch2 = newStandardSwitch()
|
||||
var awaiters: seq[Future[void]]
|
||||
awaiters.add(await switch1.start())
|
||||
awaiters.add(await switch2.start())
|
||||
@@ -620,7 +620,7 @@ suite "Switch":
|
||||
asyncTest "e2e canceling dial should not leak":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
|
||||
let transport = TcpTransport.init()
|
||||
let transport = TcpTransport.init(upgrade = Upgrade())
|
||||
await transport.start(ma)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
@@ -656,7 +656,7 @@ suite "Switch":
|
||||
asyncTest "e2e closing remote conn should not leak":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
|
||||
let transport = TcpTransport.init()
|
||||
let transport = TcpTransport.init(upgrade = Upgrade())
|
||||
await transport.start(ma)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
|
||||
@@ -5,6 +5,7 @@ import chronos, stew/byteutils
|
||||
import ../libp2p/[stream/connection,
|
||||
transports/transport,
|
||||
transports/tcptransport,
|
||||
upgrademngrs/upgrade,
|
||||
multiaddress,
|
||||
errors,
|
||||
wire]
|
||||
@@ -17,7 +18,7 @@ suite "TCP transport":
|
||||
|
||||
asyncTest "test listener: handle write":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
let transport: TcpTransport = TcpTransport.init()
|
||||
let transport: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
asyncCheck transport.start(ma)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
@@ -39,7 +40,7 @@ suite "TCP transport":
|
||||
asyncTest "test listener: handle read":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
|
||||
let transport: TcpTransport = TcpTransport.init()
|
||||
let transport: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
asyncCheck transport.start(ma)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
@@ -77,7 +78,7 @@ suite "TCP transport":
|
||||
server.start()
|
||||
|
||||
let ma: MultiAddress = MultiAddress.init(server.sock.getLocalAddress()).tryGet()
|
||||
let transport: TcpTransport = TcpTransport.init()
|
||||
let transport: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport.dial(ma)
|
||||
var msg = newSeq[byte](6)
|
||||
await conn.readExactly(addr msg[0], 6)
|
||||
@@ -111,7 +112,7 @@ suite "TCP transport":
|
||||
server.start()
|
||||
|
||||
let ma: MultiAddress = MultiAddress.init(server.sock.getLocalAddress()).tryGet()
|
||||
let transport: TcpTransport = TcpTransport.init()
|
||||
let transport: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport.dial(ma)
|
||||
await conn.write("Hello!")
|
||||
|
||||
@@ -127,7 +128,7 @@ suite "TCP transport":
|
||||
asyncTest "e2e: handle write":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
await transport1.start(ma)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
@@ -137,7 +138,7 @@ suite "TCP transport":
|
||||
|
||||
let handlerWait = acceptHandler()
|
||||
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
var msg = newSeq[byte](6)
|
||||
await conn.readExactly(addr msg[0], 6)
|
||||
@@ -152,7 +153,7 @@ suite "TCP transport":
|
||||
|
||||
asyncTest "e2e: handle read":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
asyncCheck transport1.start(ma)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
@@ -164,7 +165,7 @@ suite "TCP transport":
|
||||
|
||||
let handlerWait = acceptHandler()
|
||||
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
await conn.write("Hello!")
|
||||
|
||||
@@ -177,10 +178,10 @@ suite "TCP transport":
|
||||
asyncTest "e2e: handle dial cancellation":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
await transport1.start(ma)
|
||||
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let cancellation = transport2.dial(transport1.ma)
|
||||
|
||||
await cancellation.cancelAndWait()
|
||||
@@ -192,7 +193,7 @@ suite "TCP transport":
|
||||
asyncTest "e2e: handle accept cancellation":
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
|
||||
let transport1: TcpTransport = TcpTransport.init()
|
||||
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
await transport1.start(ma)
|
||||
|
||||
let acceptHandler = transport1.accept()
|
||||
|
||||
104
tests/testupgrade.nim
Normal file
104
tests/testupgrade.nim
Normal file
@@ -0,0 +1,104 @@
|
||||
import unittest, tables
|
||||
import chronos
|
||||
|
||||
import ../libp2p
|
||||
import ../libp2p/upgrademngrs/muxedupgrade
|
||||
|
||||
import ./helpers
|
||||
|
||||
proc createMuxedManager(
|
||||
seckey = PrivateKey.random(rng[]).tryGet(),
|
||||
ma = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()): Upgrade =
|
||||
|
||||
proc createMplex(conn: Connection): Muxer =
|
||||
Mplex.init(conn)
|
||||
|
||||
let
|
||||
peerInfo = PeerInfo.init(seckey, [ma])
|
||||
mplexProvider = newMuxerProvider(createMplex, MplexCodec)
|
||||
|
||||
return MuxedUpgrade.init(
|
||||
newIdentify(peerInfo),
|
||||
{MplexCodec: mplexProvider}.toTable,
|
||||
[newNoise(rng, seckey).Secure],
|
||||
ConnManager.init(),
|
||||
newMultistream())
|
||||
|
||||
suite "Test Upgrade Managers":
|
||||
|
||||
asyncTest "Test Identify flow":
|
||||
let
|
||||
ma = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
seckey = PrivateKey.random(rng[]).tryGet()
|
||||
peerInfo = PeerInfo.init(seckey, [ma])
|
||||
upgrade = createMuxedManager()
|
||||
transport1 = TcpTransport.init(upgrade = Upgrade())
|
||||
transport2 = TcpTransport.init(upgrade = upgrade)
|
||||
identifyProto = newIdentify(peerInfo)
|
||||
msListen = newMultistream()
|
||||
|
||||
peerInfo.agentVersion = AgentVersion
|
||||
peerInfo.protoVersion = ProtoVersion
|
||||
|
||||
msListen.addHandler(IdentifyCodec, identifyProto)
|
||||
let serverFut = transport1.start(ma)
|
||||
proc acceptHandler(): Future[void] {.async, gcsafe.} =
|
||||
let c = await transport1.accept()
|
||||
await msListen.handle(c)
|
||||
|
||||
let acceptFut = acceptHandler()
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
check isNil(conn.peerInfo)
|
||||
await upgrade.identify(conn)
|
||||
|
||||
check:
|
||||
not isNil(conn.peerInfo)
|
||||
conn.peerInfo.peerId == peerInfo.peerId
|
||||
conn.peerInfo.addrs == peerInfo.addrs
|
||||
conn.peerInfo.agentVersion == peerInfo.agentVersion
|
||||
conn.peerInfo.protoVersion == peerInfo.protoVersion
|
||||
conn.peerInfo.protocols == peerInfo.protocols
|
||||
|
||||
await allFuturesThrowing(
|
||||
transport1.stop(),
|
||||
transport2.stop(),
|
||||
acceptFut,
|
||||
serverFut
|
||||
)
|
||||
|
||||
asyncTest "Test Secure flow":
|
||||
let
|
||||
ma = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
seckey = PrivateKey.random(rng[]).tryGet()
|
||||
peerInfo = PeerInfo.init(seckey, [ma])
|
||||
upgrade = createMuxedManager()
|
||||
transport1 = TcpTransport.init(upgrade = Upgrade())
|
||||
transport2 = TcpTransport.init(upgrade = upgrade)
|
||||
secure = newNoise(rng, seckey).Secure
|
||||
msListen = newMultistream()
|
||||
|
||||
peerInfo.agentVersion = AgentVersion
|
||||
peerInfo.protoVersion = ProtoVersion
|
||||
|
||||
msListen.addHandler(NoiseCodec, secure)
|
||||
let serverFut = transport1.start(ma)
|
||||
proc acceptHandler(): Future[void] {.async, gcsafe.} =
|
||||
let c = await transport1.accept()
|
||||
await msListen.handle(c)
|
||||
|
||||
let acceptFut = acceptHandler()
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
check isNil(conn.peerInfo)
|
||||
let sconn = await upgrade.secure(conn)
|
||||
check:
|
||||
not isNil(sconn)
|
||||
sconn.transportDir == Direction.Out
|
||||
|
||||
await allFuturesThrowing(
|
||||
transport1.stop(),
|
||||
transport2.stop(),
|
||||
acceptFut,
|
||||
serverFut
|
||||
)
|
||||
Reference in New Issue
Block a user