mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 12:58:05 -05:00
Compare commits
29 Commits
v1.2.0
...
anyaddr-re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fdff1ebec5 | ||
|
|
d34575675c | ||
|
|
03f72a8b5c | ||
|
|
b11e2b0349 | ||
|
|
2fa2c4425f | ||
|
|
e228981a11 | ||
|
|
907c41a491 | ||
|
|
03668a3e90 | ||
|
|
980950b147 | ||
|
|
eb6a9f2ff1 | ||
|
|
7810dba9d6 | ||
|
|
6d327c37c4 | ||
|
|
21cb13a8ff | ||
|
|
1b2b009f79 | ||
|
|
3a659ffddb | ||
|
|
ac994a8f15 | ||
|
|
ee8318ec42 | ||
|
|
52a8870f78 | ||
|
|
0911cb20f4 | ||
|
|
d7c0486968 | ||
|
|
63b6390d1a | ||
|
|
cf7b77bf82 | ||
|
|
3ca49a2f40 | ||
|
|
1b91b97499 | ||
|
|
21cbe3a91a | ||
|
|
88e233db81 | ||
|
|
84659af45b | ||
|
|
aef44ed1ce | ||
|
|
02c96fc003 |
3
.github/workflows/bumper.yml
vendored
3
.github/workflows/bumper.yml
vendored
@@ -2,8 +2,7 @@ name: Bumper
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- unstable
|
||||
- bumper
|
||||
- master
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
|
||||
@@ -7,7 +7,6 @@ if dirExists("nimbledeps/pkgs2"):
|
||||
switch("warning", "CaseTransition:off")
|
||||
switch("warning", "ObservableStores:off")
|
||||
switch("warning", "LockLevel:off")
|
||||
--define:chronosStrictException
|
||||
--styleCheck:usages
|
||||
switch("warningAsError", "UseBase:on")
|
||||
--styleCheck:error
|
||||
|
||||
24
di/di.nim
Normal file
24
di/di.nim
Normal file
@@ -0,0 +1,24 @@
|
||||
import typetraits
|
||||
import tables
|
||||
|
||||
type
|
||||
BindingKey = tuple[typeName: string, qualifier: string]
|
||||
|
||||
Container* = ref object
|
||||
bindings*: Table[BindingKey, proc(): RootRef {.gcsafe, raises: [].}]
|
||||
|
||||
BindingNotFoundError* = object of CatchableError
|
||||
|
||||
proc register*[T](c: Container, implementation: proc(): T {.gcsafe, raises: [].}, qualifier: string = "") =
|
||||
let key: BindingKey = (name(T), qualifier)
|
||||
proc p(): RootRef =
|
||||
let o: RootRef = implementation()
|
||||
return o
|
||||
c.bindings[key] = p
|
||||
|
||||
proc resolve*[T](c: Container, qualifier: string = ""): T {.raises: [BindingNotFoundError]} =
|
||||
let key: BindingKey = (name(T), qualifier)
|
||||
try:
|
||||
return cast[T](c.bindings[key]())
|
||||
except KeyError:
|
||||
raise newException(BindingNotFoundError, "Type not bound: " & name(T))
|
||||
@@ -19,7 +19,8 @@ runnableExamples:
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
options, tables, chronos, chronicles, sequtils,
|
||||
options, tables, chronos, chronicles, sequtils
|
||||
import
|
||||
switch, peerid, peerinfo, stream/connection, multiaddress,
|
||||
crypto/crypto, transports/[transport, tcptransport],
|
||||
muxers/[muxer, mplex/mplex, yamux/yamux],
|
||||
@@ -28,6 +29,8 @@ import
|
||||
connmanager, upgrademngrs/muxedupgrade, observedaddrmanager,
|
||||
nameresolving/nameresolver,
|
||||
errors, utility
|
||||
import services/wildcardresolverservice
|
||||
import ../di/di
|
||||
|
||||
export
|
||||
switch, peerid, peerinfo, connection, multiaddress, crypto, errors
|
||||
@@ -59,6 +62,8 @@ type
|
||||
rdv: RendezVous
|
||||
services: seq[Service]
|
||||
observedAddrManager: ObservedAddrManager
|
||||
enableWildcardResolver: bool
|
||||
container*: Container
|
||||
|
||||
proc new*(T: type[SwitchBuilder]): T {.public.} =
|
||||
## Creates a SwitchBuilder
|
||||
@@ -67,7 +72,7 @@ proc new*(T: type[SwitchBuilder]): T {.public.} =
|
||||
.init("/ip4/127.0.0.1/tcp/0")
|
||||
.expect("Should initialize to default")
|
||||
|
||||
SwitchBuilder(
|
||||
let sb = SwitchBuilder(
|
||||
privKey: none(PrivateKey),
|
||||
addresses: @[address],
|
||||
secureManagers: @[],
|
||||
@@ -76,7 +81,12 @@ proc new*(T: type[SwitchBuilder]): T {.public.} =
|
||||
maxOut: -1,
|
||||
maxConnsPerPeer: MaxConnectionsPerPeer,
|
||||
protoVersion: ProtoVersion,
|
||||
agentVersion: AgentVersion)
|
||||
agentVersion: AgentVersion,
|
||||
container: Container())
|
||||
|
||||
register[NetworkInterfaceProvider](sb.container, networkInterfaceProvider)
|
||||
|
||||
sb
|
||||
|
||||
proc withPrivateKey*(b: SwitchBuilder, privateKey: PrivateKey): SwitchBuilder {.public.} =
|
||||
## Set the private key of the switch. Will be used to
|
||||
@@ -85,20 +95,19 @@ proc withPrivateKey*(b: SwitchBuilder, privateKey: PrivateKey): SwitchBuilder {.
|
||||
b.privKey = some(privateKey)
|
||||
b
|
||||
|
||||
proc withAddress*(b: SwitchBuilder, address: MultiAddress): SwitchBuilder {.public.} =
|
||||
## | Set the listening address of the switch
|
||||
## | Calling it multiple time will override the value
|
||||
|
||||
b.addresses = @[address]
|
||||
b
|
||||
|
||||
proc withAddresses*(b: SwitchBuilder, addresses: seq[MultiAddress]): SwitchBuilder {.public.} =
|
||||
proc withAddresses*(b: SwitchBuilder, addresses: seq[MultiAddress], enableWildcardResolver: bool = true): SwitchBuilder {.public.} =
|
||||
## | Set the listening addresses of the switch
|
||||
## | Calling it multiple time will override the value
|
||||
|
||||
b.addresses = addresses
|
||||
b.enableWildcardResolver = enableWildcardResolver
|
||||
b
|
||||
|
||||
proc withAddress*(b: SwitchBuilder, address: MultiAddress, enableWildcardResolver: bool = true): SwitchBuilder {.public.} =
|
||||
## | Set the listening address of the switch
|
||||
## | Calling it multiple time will override the value
|
||||
b.withAddresses(@[address], enableWildcardResolver)
|
||||
|
||||
|
||||
proc withSignedPeerRecord*(b: SwitchBuilder, sendIt = true): SwitchBuilder {.public.} =
|
||||
b.sendSignedPeerRecord = sendIt
|
||||
b
|
||||
@@ -209,6 +218,10 @@ proc withObservedAddrManager*(b: SwitchBuilder, observedAddrManager: ObservedAdd
|
||||
b.observedAddrManager = observedAddrManager
|
||||
b
|
||||
|
||||
proc withBinding*[T](b: SwitchBuilder, binding: proc(): T {.gcsafe, raises: [].}): SwitchBuilder =
|
||||
register[T](b.container, binding)
|
||||
b
|
||||
|
||||
proc build*(b: SwitchBuilder): Switch
|
||||
{.raises: [LPError], public.} =
|
||||
|
||||
@@ -261,6 +274,12 @@ proc build*(b: SwitchBuilder): Switch
|
||||
else:
|
||||
PeerStore.new(identify)
|
||||
|
||||
try:
|
||||
let networkInterfaceProvider = resolve[NetworkInterfaceProvider](b.container)
|
||||
b.services.add(WildcardAddressResolverService.new(networkInterfaceProvider))
|
||||
except BindingNotFoundError as e:
|
||||
raise newException(LPError, "Cannot resolve NetworkInterfaceProvider", e)
|
||||
|
||||
let switch = newSwitch(
|
||||
peerInfo = peerInfo,
|
||||
transports = transports,
|
||||
@@ -312,7 +331,7 @@ proc newStandardSwitch*(
|
||||
let addrs = when addrs is MultiAddress: @[addrs] else: addrs
|
||||
var b = SwitchBuilder
|
||||
.new()
|
||||
.withAddresses(addrs)
|
||||
.withAddresses(addrs, true)
|
||||
.withRng(rng)
|
||||
.withSignedPeerRecord(sendSignedPeerRecord)
|
||||
.withMaxConnections(maxConnections)
|
||||
|
||||
@@ -81,16 +81,18 @@ proc dialAndUpgrade(
|
||||
if dialed.dir != dir:
|
||||
dialed.dir = dir
|
||||
await transport.upgrade(dialed, peerId)
|
||||
except CancelledError as exc:
|
||||
await dialed.close()
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
# If we failed to establish the connection through one transport,
|
||||
# we won't succeeded through another - no use in trying again
|
||||
await dialed.close()
|
||||
debug "Connection upgrade failed", err = exc.msg, peerId = peerId.get(default(PeerId))
|
||||
if exc isnot CancelledError:
|
||||
if dialed.dir == Direction.Out:
|
||||
libp2p_failed_upgrades_outgoing.inc()
|
||||
else:
|
||||
libp2p_failed_upgrades_incoming.inc()
|
||||
if dialed.dir == Direction.Out:
|
||||
libp2p_failed_upgrades_outgoing.inc()
|
||||
else:
|
||||
libp2p_failed_upgrades_incoming.inc()
|
||||
|
||||
# Try other address
|
||||
return nil
|
||||
|
||||
@@ -44,12 +44,3 @@ macro checkFutures*[F](futs: seq[F], exclude: untyped = []): untyped =
|
||||
# We still don't abort but warn
|
||||
debug "A future has failed, enable trace logging for details", error=exc.name
|
||||
trace "Exception details", msg=exc.msg
|
||||
|
||||
template tryAndWarn*(message: static[string]; body: untyped): untyped =
|
||||
try:
|
||||
body
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "An exception has ocurred, enable trace logging for details", name = exc.name, msg = message
|
||||
trace "Exception details", exc = exc.msg
|
||||
|
||||
@@ -13,12 +13,12 @@
|
||||
{.push public.}
|
||||
|
||||
import pkg/chronos, chronicles
|
||||
import std/[nativesockets, hashes]
|
||||
import tables, strutils, sets, stew/shims/net
|
||||
import std/[nativesockets, net, hashes]
|
||||
import tables, strutils, sets
|
||||
import multicodec, multihash, multibase, transcoder, vbuffer, peerid,
|
||||
protobuf/minprotobuf, errors, utility
|
||||
import stew/[base58, base32, endians2, results]
|
||||
export results, minprotobuf, vbuffer, utility
|
||||
export results, minprotobuf, vbuffer, utility, multicodec
|
||||
|
||||
logScope:
|
||||
topics = "libp2p multiaddress"
|
||||
|
||||
@@ -266,6 +266,7 @@ proc addHandler*[E](
|
||||
|
||||
proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} =
|
||||
# Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([])`
|
||||
# TODO https://github.com/nim-lang/Nim/issues/23445
|
||||
var futs = newSeqOfCap[Future[void].Raising([CancelledError])](m.handlers.len)
|
||||
for it in m.handlers:
|
||||
futs.add it.protocol.start()
|
||||
@@ -278,7 +279,7 @@ proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} =
|
||||
doAssert m.handlers.len == futs.len, "Handlers modified while starting"
|
||||
for i, fut in futs:
|
||||
if not fut.finished:
|
||||
pending.add noCancel fut.cancelAndWait()
|
||||
pending.add fut.cancelAndWait()
|
||||
elif fut.completed:
|
||||
pending.add m.handlers[i].protocol.stop()
|
||||
else:
|
||||
@@ -286,7 +287,6 @@ proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} =
|
||||
await noCancel allFutures(pending)
|
||||
raise exc
|
||||
|
||||
|
||||
proc stop*(m: MultistreamSelect) {.async: (raises: []).} =
|
||||
# Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([CancelledError])`
|
||||
var futs = newSeqOfCap[Future[void].Raising([])](m.handlers.len)
|
||||
|
||||
@@ -164,7 +164,6 @@ type
|
||||
closedRemotely: Future[void].Raising([])
|
||||
closedLocally: bool
|
||||
receivedData: AsyncEvent
|
||||
returnedEof: bool
|
||||
|
||||
proc `$`(channel: YamuxChannel): string =
|
||||
result = if channel.conn.dir == Out: "=> " else: "<= "
|
||||
@@ -204,8 +203,8 @@ proc remoteClosed(channel: YamuxChannel) {.async: (raises: []).} =
|
||||
|
||||
method closeImpl*(channel: YamuxChannel) {.async: (raises: []).} =
|
||||
if not channel.closedLocally:
|
||||
trace "Closing yamux channel locally", streamId = channel.id, conn = channel.conn
|
||||
channel.closedLocally = true
|
||||
channel.isEof = true
|
||||
|
||||
if not channel.isReset and channel.sendQueue.len == 0:
|
||||
try: await channel.conn.write(YamuxHeader.data(channel.id, 0, {Fin}))
|
||||
@@ -273,7 +272,7 @@ method readOnce*(
|
||||
newLPStreamClosedError()
|
||||
else:
|
||||
newLPStreamConnDownError()
|
||||
if channel.returnedEof:
|
||||
if channel.isEof:
|
||||
raise newLPStreamRemoteClosedError()
|
||||
if channel.recvQueue.len == 0:
|
||||
channel.receivedData.clear()
|
||||
@@ -281,9 +280,8 @@ method readOnce*(
|
||||
discard await race(channel.closedRemotely, channel.receivedData.wait())
|
||||
except ValueError: raiseAssert("Futures list is not empty")
|
||||
if channel.closedRemotely.completed() and channel.recvQueue.len == 0:
|
||||
channel.returnedEof = true
|
||||
channel.isEof = true
|
||||
return 0
|
||||
return 0 # we return 0 to indicate that the channel is closed for reading from now on
|
||||
|
||||
let toRead = min(channel.recvQueue.len, nbytes)
|
||||
|
||||
|
||||
@@ -24,12 +24,16 @@ type
|
||||
AddressMapper* =
|
||||
proc(listenAddrs: seq[MultiAddress]): Future[seq[MultiAddress]]
|
||||
{.gcsafe, raises: [].}
|
||||
## A proc that expected to resolve the listen addresses into dialable addresses
|
||||
|
||||
PeerInfo* {.public.} = ref object
|
||||
peerId*: PeerId
|
||||
listenAddrs*: seq[MultiAddress]
|
||||
## contains addresses the node listens on, which may include wildcard and private addresses (not directly reachable).
|
||||
addrs: seq[MultiAddress]
|
||||
## contains resolved addresses that other peers can use to connect, including public-facing NAT and port-forwarded addresses.
|
||||
addressMappers*: seq[AddressMapper]
|
||||
## contains a list of procs that can be used to resolve the listen addresses into dialable addresses.
|
||||
protocols*: seq[string]
|
||||
protoVersion*: string
|
||||
agentVersion*: string
|
||||
|
||||
@@ -56,7 +56,7 @@ method init*(p: Ping) =
|
||||
trace "handling ping", conn
|
||||
var buf: array[PingSize, byte]
|
||||
await conn.readExactly(addr buf[0], PingSize)
|
||||
trace "echoing ping", conn
|
||||
trace "echoing ping", conn, pingData = @buf
|
||||
await conn.write(@buf)
|
||||
if not isNil(p.pingHandler):
|
||||
await p.pingHandler(conn.peerId)
|
||||
|
||||
@@ -16,6 +16,7 @@ import ./pubsub,
|
||||
./timedcache,
|
||||
./peertable,
|
||||
./rpc/[message, messages, protobuf],
|
||||
nimcrypto/[hash, sha2],
|
||||
../../crypto/crypto,
|
||||
../../stream/connection,
|
||||
../../peerid,
|
||||
@@ -32,25 +33,34 @@ const FloodSubCodec* = "/floodsub/1.0.0"
|
||||
type
|
||||
FloodSub* {.public.} = ref object of PubSub
|
||||
floodsub*: PeerTable # topic to remote peer map
|
||||
seen*: TimedCache[MessageId] # message id:s already seen on the network
|
||||
seenSalt*: seq[byte]
|
||||
seen*: TimedCache[SaltedId]
|
||||
# Early filter for messages recently observed on the network
|
||||
# We use a salted id because the messages in this cache have not yet
|
||||
# been validated meaning that an attacker has greater control over the
|
||||
# hash key and therefore could poison the table
|
||||
seenSalt*: sha256
|
||||
# The salt in this case is a partially updated SHA256 context pre-seeded
|
||||
# with some random data
|
||||
|
||||
proc hasSeen*(f: FloodSub, msgId: MessageId): bool =
|
||||
f.seenSalt & msgId in f.seen
|
||||
proc salt*(f: FloodSub, msgId: MessageId): SaltedId =
|
||||
var tmp = f.seenSalt
|
||||
tmp.update(msgId)
|
||||
SaltedId(data: tmp.finish())
|
||||
|
||||
proc addSeen*(f: FloodSub, msgId: MessageId): bool =
|
||||
# Salting the seen hash helps avoid attacks against the hash function used
|
||||
# in the nim hash table
|
||||
proc hasSeen*(f: FloodSub, saltedId: SaltedId): bool =
|
||||
saltedId in f.seen
|
||||
|
||||
proc addSeen*(f: FloodSub, saltedId: SaltedId): bool =
|
||||
# Return true if the message has already been seen
|
||||
f.seen.put(f.seenSalt & msgId)
|
||||
f.seen.put(saltedId)
|
||||
|
||||
proc firstSeen*(f: FloodSub, msgId: MessageId): Moment =
|
||||
f.seen.addedAt(f.seenSalt & msgId)
|
||||
proc firstSeen*(f: FloodSub, saltedId: SaltedId): Moment =
|
||||
f.seen.addedAt(saltedId)
|
||||
|
||||
proc handleSubscribe*(f: FloodSub,
|
||||
peer: PubSubPeer,
|
||||
topic: string,
|
||||
subscribe: bool) =
|
||||
proc handleSubscribe(f: FloodSub,
|
||||
peer: PubSubPeer,
|
||||
topic: string,
|
||||
subscribe: bool) =
|
||||
logScope:
|
||||
peer
|
||||
topic
|
||||
@@ -96,10 +106,9 @@ method unsubscribePeer*(f: FloodSub, peer: PeerId) =
|
||||
method rpcHandler*(f: FloodSub,
|
||||
peer: PubSubPeer,
|
||||
data: seq[byte]) {.async.} =
|
||||
|
||||
var rpcMsg = decodeRpcMsg(data).valueOr:
|
||||
debug "failed to decode msg from peer", peer, err = error
|
||||
raise newException(CatchableError, "")
|
||||
raise newException(CatchableError, "Peer msg couldn't be decoded")
|
||||
|
||||
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
|
||||
# trigger hooks
|
||||
@@ -117,9 +126,11 @@ method rpcHandler*(f: FloodSub,
|
||||
# TODO: descore peers due to error during message validation (malicious?)
|
||||
continue
|
||||
|
||||
let msgId = msgIdResult.get
|
||||
let
|
||||
msgId = msgIdResult.get
|
||||
saltedId = f.salt(msgId)
|
||||
|
||||
if f.addSeen(msgId):
|
||||
if f.addSeen(saltedId):
|
||||
trace "Dropping already-seen message", msgId, peer
|
||||
continue
|
||||
|
||||
@@ -216,7 +227,7 @@ method publish*(f: FloodSub,
|
||||
trace "Created new message",
|
||||
msg = shortLog(msg), peers = peers.len, topic, msgId
|
||||
|
||||
if f.addSeen(msgId):
|
||||
if f.addSeen(f.salt(msgId)):
|
||||
# custom msgid providers might cause this
|
||||
trace "Dropping already-seen message", msgId, topic
|
||||
return 0
|
||||
@@ -234,8 +245,11 @@ method publish*(f: FloodSub,
|
||||
method initPubSub*(f: FloodSub)
|
||||
{.raises: [InitializationError].} =
|
||||
procCall PubSub(f).initPubSub()
|
||||
f.seen = TimedCache[MessageId].init(2.minutes)
|
||||
f.seenSalt = newSeqUninitialized[byte](sizeof(Hash))
|
||||
hmacDrbgGenerate(f.rng[], f.seenSalt)
|
||||
f.seen = TimedCache[SaltedId].init(2.minutes)
|
||||
f.seenSalt.init()
|
||||
|
||||
var tmp: array[32, byte]
|
||||
hmacDrbgGenerate(f.rng[], tmp)
|
||||
f.seenSalt.update(tmp)
|
||||
|
||||
f.init()
|
||||
|
||||
@@ -266,10 +266,10 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
|
||||
|
||||
procCall FloodSub(g).unsubscribePeer(peer)
|
||||
|
||||
proc handleSubscribe*(g: GossipSub,
|
||||
peer: PubSubPeer,
|
||||
topic: string,
|
||||
subscribe: bool) =
|
||||
proc handleSubscribe(g: GossipSub,
|
||||
peer: PubSubPeer,
|
||||
topic: string,
|
||||
subscribe: bool) =
|
||||
logScope:
|
||||
peer
|
||||
topic
|
||||
@@ -360,13 +360,13 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||
|
||||
proc validateAndRelay(g: GossipSub,
|
||||
msg: Message,
|
||||
msgId, msgIdSalted: MessageId,
|
||||
msgId: MessageId, saltedId: SaltedId,
|
||||
peer: PubSubPeer) {.async.} =
|
||||
try:
|
||||
let validation = await g.validate(msg)
|
||||
|
||||
var seenPeers: HashSet[PubSubPeer]
|
||||
discard g.validationSeen.pop(msgIdSalted, seenPeers)
|
||||
discard g.validationSeen.pop(saltedId, seenPeers)
|
||||
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
|
||||
libp2p_gossipsub_saved_bytes.inc((msg.data.len * seenPeers.len).int64, labelValues = ["validation_duplicate"])
|
||||
|
||||
@@ -395,9 +395,7 @@ proc validateAndRelay(g: GossipSub,
|
||||
|
||||
g.floodsub.withValue(topic, peers): toSendPeers.incl(peers[])
|
||||
g.mesh.withValue(topic, peers): toSendPeers.incl(peers[])
|
||||
|
||||
# add direct peers
|
||||
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(topic))
|
||||
g.subscribedDirectPeers.withValue(topic, peers): toSendPeers.incl(peers[])
|
||||
|
||||
# Don't send it to source peer, or peers that
|
||||
# sent it during validation
|
||||
@@ -413,14 +411,13 @@ proc validateAndRelay(g: GossipSub,
|
||||
|
||||
for peer in toSendPeers:
|
||||
for heDontWant in peer.heDontWants:
|
||||
if msgId in heDontWant:
|
||||
if saltedId in heDontWant:
|
||||
seenPeers.incl(peer)
|
||||
libp2p_gossipsub_idontwant_saved_messages.inc
|
||||
libp2p_gossipsub_saved_bytes.inc(msg.data.len.int64, labelValues = ["idontwant"])
|
||||
break
|
||||
toSendPeers.excl(seenPeers)
|
||||
|
||||
|
||||
# In theory, if topics are the same in all messages, we could batch - we'd
|
||||
# also have to be careful to only include validated messages
|
||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
|
||||
@@ -469,6 +466,11 @@ method rpcHandler*(g: GossipSub,
|
||||
var rpcMsg = decodeRpcMsg(data).valueOr:
|
||||
debug "failed to decode msg from peer", peer, err = error
|
||||
await rateLimit(g, peer, msgSize)
|
||||
# Raising in the handler closes the gossipsub connection (but doesn't
|
||||
# disconnect the peer!)
|
||||
# TODO evaluate behaviour penalty values
|
||||
peer.behaviourPenalty += 0.1
|
||||
|
||||
raise newException(CatchableError, "Peer msg couldn't be decoded")
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
@@ -478,12 +480,13 @@ method rpcHandler*(g: GossipSub,
|
||||
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
|
||||
await rateLimit(g, peer, g.messageOverhead(rpcMsg, msgSize))
|
||||
|
||||
# trigger hooks
|
||||
# trigger hooks - these may modify the message
|
||||
peer.recvObservers(rpcMsg)
|
||||
|
||||
if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
|
||||
g.send(peer, RPCMsg(pong: rpcMsg.ping), isHighPriority = true)
|
||||
peer.pingBudget.dec
|
||||
|
||||
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
|
||||
template sub: untyped = rpcMsg.subscriptions[i]
|
||||
g.handleSubscribe(peer, sub.topic, sub.subscribe)
|
||||
@@ -503,17 +506,15 @@ method rpcHandler*(g: GossipSub,
|
||||
if msgIdResult.isErr:
|
||||
debug "Dropping message due to failed message id generation",
|
||||
error = msgIdResult.error
|
||||
# TODO: descore peers due to error during message validation (malicious?)
|
||||
await g.punishInvalidMessage(peer, msg)
|
||||
continue
|
||||
|
||||
let
|
||||
msgId = msgIdResult.get
|
||||
msgIdSalted = msgId & g.seenSalt
|
||||
msgIdSalted = g.salt(msgId)
|
||||
topic = msg.topic
|
||||
|
||||
# addSeen adds salt to msgId to avoid
|
||||
# remote attacking the hash function
|
||||
if g.addSeen(msgId):
|
||||
if g.addSeen(msgIdSalted):
|
||||
trace "Dropping already-seen message", msgId = shortLog(msgId), peer
|
||||
|
||||
var alreadyReceived = false
|
||||
@@ -523,7 +524,7 @@ method rpcHandler*(g: GossipSub,
|
||||
alreadyReceived = true
|
||||
|
||||
if not alreadyReceived:
|
||||
let delay = Moment.now() - g.firstSeen(msgId)
|
||||
let delay = Moment.now() - g.firstSeen(msgIdSalted)
|
||||
g.rewardDelivered(peer, topic, false, delay)
|
||||
|
||||
libp2p_gossipsub_duplicate.inc()
|
||||
@@ -600,25 +601,24 @@ method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
|
||||
|
||||
g.mesh.del(topic)
|
||||
|
||||
|
||||
# Send unsubscribe (in reverse order to sub/graft)
|
||||
procCall PubSub(g).onTopicSubscription(topic, subscribed)
|
||||
|
||||
method publish*(g: GossipSub,
|
||||
topic: string,
|
||||
data: seq[byte]): Future[int] {.async.} =
|
||||
# base returns always 0
|
||||
discard await procCall PubSub(g).publish(topic, data)
|
||||
|
||||
logScope:
|
||||
topic
|
||||
|
||||
trace "Publishing message on topic", data = data.shortLog
|
||||
|
||||
if topic.len <= 0: # data could be 0/empty
|
||||
debug "Empty topic, skipping publish"
|
||||
return 0
|
||||
|
||||
# base returns always 0
|
||||
discard await procCall PubSub(g).publish(topic, data)
|
||||
|
||||
trace "Publishing message on topic", data = data.shortLog
|
||||
|
||||
var peers: HashSet[PubSubPeer]
|
||||
|
||||
# add always direct peers
|
||||
@@ -631,38 +631,39 @@ method publish*(g: GossipSub,
|
||||
# With flood publishing enabled, the mesh is used when propagating messages from other peers,
|
||||
# but a peer's own messages will always be published to all known peers in the topic, limited
|
||||
# to the amount of peers we can send it to in one heartbeat
|
||||
var maxPeersToFlodOpt: Opt[int64]
|
||||
if g.parameters.bandwidthEstimatebps > 0:
|
||||
let
|
||||
bandwidth = (g.parameters.bandwidthEstimatebps) div 8 div 1000 # Divisions are to convert it to Bytes per ms TODO replace with bandwidth estimate
|
||||
msToTransmit = max(data.len div bandwidth, 1)
|
||||
maxPeersToFlodOpt = Opt.some(max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow))
|
||||
|
||||
let maxPeersToFlood =
|
||||
if g.parameters.bandwidthEstimatebps > 0:
|
||||
let
|
||||
bandwidth = (g.parameters.bandwidthEstimatebps) div 8 div 1000 # Divisions are to convert it to Bytes per ms TODO replace with bandwidth estimate
|
||||
msToTransmit = max(data.len div bandwidth, 1)
|
||||
max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow)
|
||||
else:
|
||||
int.high() # unlimited
|
||||
|
||||
for peer in g.gossipsub.getOrDefault(topic):
|
||||
maxPeersToFlodOpt.withValue(maxPeersToFlod):
|
||||
if peers.len >= maxPeersToFlod: break
|
||||
if peers.len >= maxPeersToFlood: break
|
||||
|
||||
if peer.score >= g.parameters.publishThreshold:
|
||||
trace "publish: including flood/high score peer", peer
|
||||
peers.incl(peer)
|
||||
|
||||
if peers.len < g.parameters.dLow:
|
||||
# not subscribed, or bad mesh, send to fanout peers
|
||||
var fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
|
||||
if fanoutPeers.len < g.parameters.dLow:
|
||||
g.replenishFanout(topic)
|
||||
fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
|
||||
elif peers.len < g.parameters.dLow:
|
||||
# not subscribed or bad mesh, send to fanout peers
|
||||
# when flood-publishing, fanout won't help since all potential peers have
|
||||
# already been added
|
||||
|
||||
g.replenishFanout(topic) # Make sure fanout is populated
|
||||
|
||||
var fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
|
||||
g.rng.shuffle(fanoutPeers)
|
||||
|
||||
for fanPeer in fanoutPeers:
|
||||
peers.incl(fanPeer)
|
||||
if peers.len > g.parameters.d: break
|
||||
|
||||
# even if we couldn't publish,
|
||||
# we still attempted to publish
|
||||
# on the topic, so it makes sense
|
||||
# to update the last topic publish
|
||||
# time
|
||||
# Attempting to publish counts as fanout send (even if the message
|
||||
# ultimately is not sent)
|
||||
g.lastFanoutPubSub[topic] = Moment.fromNow(g.parameters.fanoutTTL)
|
||||
|
||||
if peers.len == 0:
|
||||
@@ -690,8 +691,10 @@ method publish*(g: GossipSub,
|
||||
|
||||
trace "Created new message", msg = shortLog(msg), peers = peers.len
|
||||
|
||||
if g.addSeen(msgId):
|
||||
# custom msgid providers might cause this
|
||||
if g.addSeen(g.salt(msgId)):
|
||||
# If the message was received or published recently, don't re-publish it -
|
||||
# this might happen when not using sequence id:s and / or with a custom
|
||||
# message id provider
|
||||
trace "Dropping already-seen message"
|
||||
return 0
|
||||
|
||||
@@ -779,7 +782,7 @@ method initPubSub*(g: GossipSub)
|
||||
raise newException(InitializationError, $validationRes.error)
|
||||
|
||||
# init the floodsub stuff here, we customize timedcache in gossip!
|
||||
g.seen = TimedCache[MessageId].init(g.parameters.seenTTL)
|
||||
g.seen = TimedCache[SaltedId].init(g.parameters.seenTTL)
|
||||
|
||||
# init gossip stuff
|
||||
g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength)
|
||||
|
||||
@@ -30,7 +30,7 @@ declareGauge(libp2p_gossipsub_healthy_peers_topics, "number of topics in mesh wi
|
||||
declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"])
|
||||
declareGauge(libp2p_gossipsub_received_iwants, "received iwants", labels = ["kind"])
|
||||
|
||||
proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [].} =
|
||||
proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) =
|
||||
g.withPeerStats(p.peerId) do (stats: var PeerStats):
|
||||
var info = stats.topicInfos.getOrDefault(topic)
|
||||
info.graftTime = Moment.now()
|
||||
@@ -46,7 +46,7 @@ proc pruned*(g: GossipSub,
|
||||
p: PubSubPeer,
|
||||
topic: string,
|
||||
setBackoff: bool = true,
|
||||
backoff = none(Duration)) {.raises: [].} =
|
||||
backoff = none(Duration)) =
|
||||
if setBackoff:
|
||||
let
|
||||
backoffDuration = backoff.get(g.parameters.pruneBackoff)
|
||||
@@ -70,7 +70,7 @@ proc pruned*(g: GossipSub,
|
||||
|
||||
trace "pruned", peer=p, topic
|
||||
|
||||
proc handleBackingOff*(t: var BackoffTable, topic: string) {.raises: [].} =
|
||||
proc handleBackingOff*(t: var BackoffTable, topic: string) =
|
||||
let now = Moment.now()
|
||||
var expired = toSeq(t.getOrDefault(topic).pairs())
|
||||
expired.keepIf do (pair: tuple[peer: PeerId, expire: Moment]) -> bool:
|
||||
@@ -79,7 +79,7 @@ proc handleBackingOff*(t: var BackoffTable, topic: string) {.raises: [].} =
|
||||
t.withValue(topic, v):
|
||||
v[].del(peer)
|
||||
|
||||
proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises: [].} =
|
||||
proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] =
|
||||
if not g.parameters.enablePX:
|
||||
return @[]
|
||||
var peers = g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()).toSeq()
|
||||
@@ -100,7 +100,7 @@ proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises:
|
||||
|
||||
proc handleGraft*(g: GossipSub,
|
||||
peer: PubSubPeer,
|
||||
grafts: seq[ControlGraft]): seq[ControlPrune] = # {.raises: [Defect].} TODO chronicles exception on windows
|
||||
grafts: seq[ControlGraft]): seq[ControlPrune] =
|
||||
var prunes: seq[ControlPrune]
|
||||
for graft in grafts:
|
||||
let topic = graft.topicID
|
||||
@@ -204,8 +204,7 @@ proc getPeers(prune: ControlPrune, peer: PubSubPeer): seq[(PeerId, Option[PeerRe
|
||||
|
||||
routingRecords
|
||||
|
||||
|
||||
proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [].} =
|
||||
proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
|
||||
for prune in prunes:
|
||||
let topic = prune.topicID
|
||||
|
||||
@@ -239,7 +238,7 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.r
|
||||
|
||||
proc handleIHave*(g: GossipSub,
|
||||
peer: PubSubPeer,
|
||||
ihaves: seq[ControlIHave]): ControlIWant {.raises: [].} =
|
||||
ihaves: seq[ControlIHave]): ControlIWant =
|
||||
var res: ControlIWant
|
||||
if peer.score < g.parameters.gossipThreshold:
|
||||
trace "ihave: ignoring low score peer", peer, score = peer.score
|
||||
@@ -251,7 +250,7 @@ proc handleIHave*(g: GossipSub,
|
||||
peer, topicID = ihave.topicID, msgs = ihave.messageIDs
|
||||
if ihave.topicID in g.topics:
|
||||
for msgId in ihave.messageIDs:
|
||||
if not g.hasSeen(msgId):
|
||||
if not g.hasSeen(g.salt(msgId)):
|
||||
if peer.iHaveBudget <= 0:
|
||||
break
|
||||
elif msgId notin res.messageIDs:
|
||||
@@ -269,12 +268,11 @@ proc handleIDontWant*(g: GossipSub,
|
||||
for dontWant in iDontWants:
|
||||
for messageId in dontWant.messageIDs:
|
||||
if peer.heDontWants[^1].len > 1000: break
|
||||
if messageId.len > 100: continue
|
||||
peer.heDontWants[^1].incl(messageId)
|
||||
peer.heDontWants[^1].incl(g.salt(messageId))
|
||||
|
||||
proc handleIWant*(g: GossipSub,
|
||||
peer: PubSubPeer,
|
||||
iwants: seq[ControlIWant]): seq[Message] {.raises: [].} =
|
||||
iwants: seq[ControlIWant]): seq[Message] =
|
||||
var
|
||||
messages: seq[Message]
|
||||
invalidRequests = 0
|
||||
@@ -300,7 +298,7 @@ proc handleIWant*(g: GossipSub,
|
||||
messages.add(msg)
|
||||
return messages
|
||||
|
||||
proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} =
|
||||
proc commitMetrics(metrics: var MeshMetrics) =
|
||||
libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics)
|
||||
libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics)
|
||||
libp2p_gossipsub_under_dout_topics.set(metrics.underDoutTopics)
|
||||
@@ -309,7 +307,7 @@ proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} =
|
||||
libp2p_gossipsub_peers_per_topic_fanout.set(metrics.otherPeersPerTopicFanout, labelValues = ["other"])
|
||||
libp2p_gossipsub_peers_per_topic_mesh.set(metrics.otherPeersPerTopicMesh, labelValues = ["other"])
|
||||
|
||||
proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) {.raises: [].} =
|
||||
proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) =
|
||||
logScope:
|
||||
topic
|
||||
mesh = g.mesh.peers(topic)
|
||||
@@ -539,7 +537,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
|
||||
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
|
||||
g.broadcast(prunes, prune, isHighPriority = true)
|
||||
|
||||
proc dropFanoutPeers*(g: GossipSub) {.raises: [].} =
|
||||
proc dropFanoutPeers*(g: GossipSub) =
|
||||
# drop peers that we haven't published to in
|
||||
# GossipSubFanoutTTL seconds
|
||||
let now = Moment.now()
|
||||
@@ -552,7 +550,7 @@ proc dropFanoutPeers*(g: GossipSub) {.raises: [].} =
|
||||
for topic in drops:
|
||||
g.lastFanoutPubSub.del topic
|
||||
|
||||
proc replenishFanout*(g: GossipSub, topic: string) {.raises: [].} =
|
||||
proc replenishFanout*(g: GossipSub, topic: string) =
|
||||
## get fanout peers for a topic
|
||||
logScope: topic
|
||||
trace "about to replenish fanout"
|
||||
@@ -568,7 +566,7 @@ proc replenishFanout*(g: GossipSub, topic: string) {.raises: [].} =
|
||||
|
||||
trace "fanout replenished with peers", peers = g.fanout.peers(topic)
|
||||
|
||||
proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: [].} =
|
||||
proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] =
|
||||
## gossip iHave messages to peers
|
||||
##
|
||||
|
||||
@@ -612,26 +610,25 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
|
||||
x notin gossipPeers and
|
||||
x.score >= g.parameters.gossipThreshold
|
||||
|
||||
var target = g.parameters.dLazy
|
||||
let factor = (g.parameters.gossipFactor.float * allPeers.len.float).int
|
||||
if factor > target:
|
||||
target = min(factor, allPeers.len)
|
||||
# https://github.com/libp2p/specs/blob/98c5aa9421703fc31b0833ad8860a55db15be063/pubsub/gossipsub/gossipsub-v1.1.md#adaptive-gossip-dissemination
|
||||
let
|
||||
factor = (g.parameters.gossipFactor.float * allPeers.len.float).int
|
||||
target = max(g.parameters.dLazy, factor)
|
||||
|
||||
if target < allPeers.len:
|
||||
g.rng.shuffle(allPeers)
|
||||
allPeers.setLen(target)
|
||||
|
||||
let msgIdsAsSet = ihave.messageIDs.toHashSet()
|
||||
|
||||
for peer in allPeers:
|
||||
control.mgetOrPut(peer, ControlMessage()).ihave.add(ihave)
|
||||
peer.sentIHaves[^1].incl(msgIdsAsSet)
|
||||
for msgId in ihave.messageIDs:
|
||||
peer.sentIHaves[^1].incl(msgId)
|
||||
|
||||
libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64)
|
||||
|
||||
return control
|
||||
|
||||
proc onHeartbeat(g: GossipSub) {.raises: [].} =
|
||||
proc onHeartbeat(g: GossipSub) =
|
||||
# reset IWANT budget
|
||||
# reset IHAVE cap
|
||||
block:
|
||||
@@ -639,7 +636,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
|
||||
peer.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||
if peer.sentIHaves.len > g.parameters.historyLength:
|
||||
discard peer.sentIHaves.popLast()
|
||||
peer.heDontWants.addFirst(default(HashSet[MessageId]))
|
||||
peer.heDontWants.addFirst(default(HashSet[SaltedId]))
|
||||
if peer.heDontWants.len > g.parameters.historyLength:
|
||||
discard peer.heDontWants.popLast()
|
||||
peer.iHaveBudget = IHavePeerBudget
|
||||
@@ -695,8 +692,6 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
|
||||
|
||||
g.mcache.shift() # shift the cache
|
||||
|
||||
# {.pop.} # raises []
|
||||
|
||||
proc heartbeat*(g: GossipSub) {.async.} =
|
||||
heartbeat "GossipSub", g.parameters.heartbeatInterval:
|
||||
trace "running heartbeat", instance = cast[int](g)
|
||||
|
||||
@@ -87,8 +87,6 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 =
|
||||
else:
|
||||
0.0
|
||||
|
||||
{.pop.}
|
||||
|
||||
proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} =
|
||||
try:
|
||||
await g.switch.disconnect(peer.peerId)
|
||||
|
||||
@@ -156,7 +156,7 @@ type
|
||||
maxNumElementsInNonPriorityQueue*: int
|
||||
|
||||
BackoffTable* = Table[string, Table[PeerId, Moment]]
|
||||
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]
|
||||
ValidationSeenTable* = Table[SaltedId, HashSet[PubSubPeer]]
|
||||
|
||||
RoutingRecordsPair* = tuple[id: PeerId, record: Option[PeerRecord]]
|
||||
RoutingRecordsHandler* =
|
||||
@@ -172,8 +172,6 @@ type
|
||||
subscribedDirectPeers*: PeerTable # directpeers that we keep alive
|
||||
backingOff*: BackoffTable # peers to backoff from when replenishing the mesh
|
||||
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
|
||||
gossip*: Table[string, seq[ControlIHave]] # pending gossip
|
||||
control*: Table[string, ControlMessage] # pending control messages
|
||||
mcache*: MCache # messages cache
|
||||
validationSeen*: ValidationSeenTable # peers who sent us message in validation
|
||||
heartbeatFut*: Future[void] # cancellation future for heartbeat interval
|
||||
|
||||
@@ -9,50 +9,57 @@
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[sets, tables, options]
|
||||
import std/[sets, tables]
|
||||
import rpc/[messages]
|
||||
import results
|
||||
|
||||
export sets, tables, messages, options
|
||||
export sets, tables, messages, results
|
||||
|
||||
type
|
||||
CacheEntry* = object
|
||||
mid*: MessageId
|
||||
msgId*: MessageId
|
||||
topic*: string
|
||||
|
||||
MCache* = object of RootObj
|
||||
msgs*: Table[MessageId, Message]
|
||||
history*: seq[seq[CacheEntry]]
|
||||
pos*: int
|
||||
windowSize*: Natural
|
||||
|
||||
func get*(c: MCache, mid: MessageId): Option[Message] =
|
||||
if mid in c.msgs:
|
||||
try: some(c.msgs[mid])
|
||||
func get*(c: MCache, msgId: MessageId): Opt[Message] =
|
||||
if msgId in c.msgs:
|
||||
try: Opt.some(c.msgs[msgId])
|
||||
except KeyError: raiseAssert "checked"
|
||||
else:
|
||||
none(Message)
|
||||
Opt.none(Message)
|
||||
|
||||
func contains*(c: MCache, mid: MessageId): bool =
|
||||
mid in c.msgs
|
||||
func contains*(c: MCache, msgId: MessageId): bool =
|
||||
msgId in c.msgs
|
||||
|
||||
func put*(c: var MCache, msgId: MessageId, msg: Message) =
|
||||
if not c.msgs.hasKeyOrPut(msgId, msg):
|
||||
# Only add cache entry if the message was not already in the cache
|
||||
c.history[0].add(CacheEntry(mid: msgId, topic: msg.topic))
|
||||
c.history[c.pos].add(CacheEntry(msgId: msgId, topic: msg.topic))
|
||||
|
||||
func window*(c: MCache, topic: string): HashSet[MessageId] =
|
||||
let
|
||||
len = min(c.windowSize, c.history.len)
|
||||
|
||||
for i in 0..<len:
|
||||
for entry in c.history[i]:
|
||||
# Work backwards from `pos` in the circular buffer
|
||||
for entry in c.history[(c.pos + c.history.len - i) mod c.history.len]:
|
||||
if entry.topic == topic:
|
||||
result.incl(entry.mid)
|
||||
result.incl(entry.msgId)
|
||||
|
||||
func shift*(c: var MCache) =
|
||||
for entry in c.history.pop():
|
||||
c.msgs.del(entry.mid)
|
||||
# Shift circular buffer to write to a new position, clearing it from past
|
||||
# iterations
|
||||
c.pos = (c.pos + 1) mod c.history.len
|
||||
|
||||
c.history.insert(@[])
|
||||
for entry in c.history[c.pos]:
|
||||
c.msgs.del(entry.msgId)
|
||||
|
||||
reset(c.history[c.pos])
|
||||
|
||||
func init*(T: type MCache, window, history: Natural): T =
|
||||
T(
|
||||
|
||||
@@ -30,7 +30,6 @@ import ./errors as pubsub_errors,
|
||||
../../errors,
|
||||
../../utility
|
||||
|
||||
import metrics
|
||||
import stew/results
|
||||
export results
|
||||
|
||||
|
||||
@@ -80,7 +80,10 @@ type
|
||||
|
||||
score*: float64
|
||||
sentIHaves*: Deque[HashSet[MessageId]]
|
||||
heDontWants*: Deque[HashSet[MessageId]]
|
||||
heDontWants*: Deque[HashSet[SaltedId]]
|
||||
## IDONTWANT contains unvalidated message id:s which may be long and/or
|
||||
## expensive to look up, so we apply the same salting to them as during
|
||||
## unvalidated message processing
|
||||
iHaveBudget*: int
|
||||
pingBudget*: int
|
||||
maxMessageSize: int
|
||||
@@ -301,7 +304,7 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
||||
if p.sendConn == nil:
|
||||
# Wait for a send conn to be setup. `connectOnce` will
|
||||
# complete this even if the sendConn setup failed
|
||||
await p.connectedFut
|
||||
discard await race(p.connectedFut)
|
||||
|
||||
var conn = p.sendConn
|
||||
if conn == nil or conn.closed():
|
||||
@@ -336,14 +339,21 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v
|
||||
## priority messages have been sent.
|
||||
doAssert(not isNil(p), "pubsubpeer nil!")
|
||||
|
||||
p.clearSendPriorityQueue()
|
||||
|
||||
# When queues are empty, skipping the non-priority queue for low priority
|
||||
# messages reduces latency
|
||||
let emptyQueues =
|
||||
(p.rpcmessagequeue.sendPriorityQueue.len() +
|
||||
p.rpcmessagequeue.nonPriorityQueue.len()) == 0
|
||||
|
||||
if msg.len <= 0:
|
||||
debug "empty message, skipping", p, msg = shortLog(msg)
|
||||
Future[void].completed()
|
||||
elif msg.len > p.maxMessageSize:
|
||||
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
|
||||
Future[void].completed()
|
||||
elif isHighPriority:
|
||||
p.clearSendPriorityQueue()
|
||||
elif isHighPriority or emptyQueues:
|
||||
let f = p.sendMsg(msg)
|
||||
if not f.finished:
|
||||
p.rpcmessagequeue.sendPriorityQueue.addLast(f)
|
||||
@@ -504,5 +514,5 @@ proc new*(
|
||||
maxNumElementsInNonPriorityQueue: maxNumElementsInNonPriorityQueue
|
||||
)
|
||||
result.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||
result.heDontWants.addFirst(default(HashSet[MessageId]))
|
||||
result.heDontWants.addFirst(default(HashSet[SaltedId]))
|
||||
result.startSendNonPriorityTask()
|
||||
|
||||
@@ -9,8 +9,8 @@
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import options, sequtils, sugar
|
||||
import "../../.."/[
|
||||
import options, sequtils
|
||||
import ../../../[
|
||||
peerid,
|
||||
routing_record,
|
||||
utility
|
||||
@@ -37,6 +37,12 @@ type
|
||||
|
||||
MessageId* = seq[byte]
|
||||
|
||||
SaltedId* = object
|
||||
# Salted hash of message ID - used instead of the ordinary message ID to
|
||||
# avoid hash poisoning attacks and to make memory usage more predictable
|
||||
# with respect to the variable-length message id
|
||||
data*: MDigest[256]
|
||||
|
||||
Message* = object
|
||||
fromPeer*: PeerId
|
||||
data*: seq[byte]
|
||||
|
||||
@@ -9,12 +9,13 @@
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[tables]
|
||||
|
||||
import std/[hashes, sets]
|
||||
import chronos/timer, stew/results
|
||||
|
||||
import ../../utility
|
||||
|
||||
export results
|
||||
|
||||
const Timeout* = 10.seconds # default timeout in ms
|
||||
|
||||
type
|
||||
@@ -26,20 +27,38 @@ type
|
||||
|
||||
TimedCache*[K] = object of RootObj
|
||||
head, tail: TimedEntry[K] # nim linked list doesn't allow inserting at pos
|
||||
entries: Table[K, TimedEntry[K]]
|
||||
entries: HashSet[TimedEntry[K]]
|
||||
timeout: Duration
|
||||
|
||||
func `==`*[E](a, b: TimedEntry[E]): bool =
|
||||
if isNil(a) == isNil(b):
|
||||
isNil(a) or a.key == b.key
|
||||
else:
|
||||
false
|
||||
|
||||
func hash*(a: TimedEntry): Hash =
|
||||
if isNil(a):
|
||||
default(Hash)
|
||||
else:
|
||||
hash(a[].key)
|
||||
|
||||
func expire*(t: var TimedCache, now: Moment = Moment.now()) =
|
||||
while t.head != nil and t.head.expiresAt < now:
|
||||
t.entries.del(t.head.key)
|
||||
t.entries.excl(t.head)
|
||||
t.head.prev = nil
|
||||
t.head = t.head.next
|
||||
if t.head == nil: t.tail = nil
|
||||
|
||||
func del*[K](t: var TimedCache[K], key: K): Opt[TimedEntry[K]] =
|
||||
# Removes existing key from cache, returning the previous value if present
|
||||
var item: TimedEntry[K]
|
||||
if t.entries.pop(key, item):
|
||||
let tmp = TimedEntry[K](key: key)
|
||||
if tmp in t.entries:
|
||||
let item = try:
|
||||
t.entries[tmp] # use the shared instance in the set
|
||||
except KeyError:
|
||||
raiseAssert "just checked"
|
||||
t.entries.excl(item)
|
||||
|
||||
if t.head == item: t.head = item.next
|
||||
if t.tail == item: t.tail = item.prev
|
||||
|
||||
@@ -55,14 +74,14 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
|
||||
# refreshed.
|
||||
t.expire(now)
|
||||
|
||||
var previous = t.del(k) # Refresh existing item
|
||||
|
||||
var addedAt = now
|
||||
previous.withValue(previous):
|
||||
addedAt = previous.addedAt
|
||||
let
|
||||
previous = t.del(k) # Refresh existing item
|
||||
addedAt = if previous.isSome():
|
||||
previous[].addedAt
|
||||
else:
|
||||
now
|
||||
|
||||
let node = TimedEntry[K](key: k, addedAt: addedAt, expiresAt: now + t.timeout)
|
||||
|
||||
if t.head == nil:
|
||||
t.tail = node
|
||||
t.head = t.tail
|
||||
@@ -83,16 +102,24 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
|
||||
if cur == t.tail:
|
||||
t.tail = node
|
||||
|
||||
t.entries[k] = node
|
||||
t.entries.incl(node)
|
||||
|
||||
previous.isSome()
|
||||
|
||||
func contains*[K](t: TimedCache[K], k: K): bool =
|
||||
k in t.entries
|
||||
let tmp = TimedEntry[K](key: k)
|
||||
tmp in t.entries
|
||||
|
||||
func addedAt*[K](t: TimedCache[K], k: K): Moment =
|
||||
t.entries.getOrDefault(k).addedAt
|
||||
func addedAt*[K](t: var TimedCache[K], k: K): Moment =
|
||||
let tmp = TimedEntry[K](key: k)
|
||||
try:
|
||||
if tmp in t.entries: # raising is slow
|
||||
# Use shared instance from entries
|
||||
return t.entries[tmp][].addedAt
|
||||
except KeyError:
|
||||
raiseAssert "just checked"
|
||||
|
||||
default(Moment)
|
||||
|
||||
func init*[K](T: type TimedCache[K], timeout: Duration = Timeout): T =
|
||||
T(
|
||||
|
||||
213
libp2p/services/wildcardresolverservice.nim
Normal file
213
libp2p/services/wildcardresolverservice.nim
Normal file
@@ -0,0 +1,213 @@
|
||||
# Nim-LibP2P
|
||||
# Copyright (c) 2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/sequtils
|
||||
import stew/[byteutils, results, endians2]
|
||||
import chronos, chronos/transports/[osnet, ipnet], chronicles
|
||||
import ../[multiaddress, multicodec]
|
||||
import ../switch
|
||||
|
||||
logScope:
|
||||
topics = "libp2p wildcardresolverservice"
|
||||
|
||||
type
|
||||
WildcardAddressResolverService* = ref object of Service
|
||||
## Service used to resolve wildcard addresses of the type "0.0.0.0" for IPv4 or "::" for IPv6.
|
||||
## When used with a `Switch`, this service will be automatically set up and stopped
|
||||
## when the switch starts and stops. This is facilitated by adding the service to the switch's
|
||||
## list of services using the `.withServices(@[svc])` method in the `SwitchBuilder`.
|
||||
networkInterfaceProvider: NetworkInterfaceProvider
|
||||
## Provides a list of network addresses.
|
||||
addressMapper: AddressMapper
|
||||
## An implementation of an address mapper that takes a list of listen addresses and expands each wildcard address
|
||||
## to the respective list of interface addresses. As an example, if the listen address is 0.0.0.0:4001
|
||||
## and the machine has 2 interfaces with IPs 172.217.11.174 and 64.233.177.113, the address mapper will
|
||||
## expand the wildcard address to 172.217.11.174:4001 and 64.233.177.113:4001.
|
||||
|
||||
NetworkInterfaceProvider* = ref object of RootObj
|
||||
|
||||
proc isLoopbackOrUp(networkInterface: NetworkInterface): bool =
|
||||
if (networkInterface.ifType == IfSoftwareLoopback) or
|
||||
(networkInterface.state == StatusUp): true else: false
|
||||
|
||||
proc networkInterfaceProvider*(): NetworkInterfaceProvider =
|
||||
## Returns a new instance of `NetworkInterfaceProvider`.
|
||||
return NetworkInterfaceProvider()
|
||||
|
||||
method getAddresses*(
|
||||
networkInterfaceProvider: NetworkInterfaceProvider, addrFamily: AddressFamily
|
||||
): seq[InterfaceAddress] {.base.} =
|
||||
## This method retrieves the addresses of network interfaces based on the specified address family.
|
||||
##
|
||||
## The `getAddresses` method filters the available network interfaces to include only
|
||||
## those that are either loopback or up. It then collects all the addresses from these
|
||||
## interfaces and filters them to match the provided address family.
|
||||
##
|
||||
## Parameters:
|
||||
## - `networkInterfaceProvider`: A provider that offers access to network interfaces.
|
||||
## - `addrFamily`: The address family to filter the network addresses (e.g., `AddressFamily.IPv4` or `AddressFamily.IPv6`).
|
||||
##
|
||||
## Returns:
|
||||
## - A sequence of `InterfaceAddress` objects that match the specified address family.
|
||||
echo "Getting addresses for address family: ", addrFamily
|
||||
let
|
||||
interfaces = getInterfaces().filterIt(it.isLoopbackOrUp())
|
||||
flatInterfaceAddresses = concat(interfaces.mapIt(it.addresses))
|
||||
filteredInterfaceAddresses =
|
||||
flatInterfaceAddresses.filterIt(it.host.family == addrFamily)
|
||||
return filteredInterfaceAddresses
|
||||
|
||||
proc new*(
|
||||
T: typedesc[WildcardAddressResolverService],
|
||||
networkInterfaceProvider: NetworkInterfaceProvider = new(NetworkInterfaceProvider),
|
||||
): T =
|
||||
## This procedure initializes a new `WildcardAddressResolverService` with the provided network interface provider.
|
||||
##
|
||||
## Parameters:
|
||||
## - `T`: The type descriptor for `WildcardAddressResolverService`.
|
||||
## - `networkInterfaceProvider`: A provider that offers access to network interfaces. Defaults to a new instance of `NetworkInterfaceProvider`.
|
||||
##
|
||||
## Returns:
|
||||
## - A new instance of `WildcardAddressResolverService`.
|
||||
return T(networkInterfaceProvider: networkInterfaceProvider)
|
||||
|
||||
proc getProtocolArgument*(ma: MultiAddress, codec: MultiCodec): MaResult[seq[byte]] =
|
||||
var buffer: seq[byte]
|
||||
for item in ma:
|
||||
let
|
||||
ritem = ?item
|
||||
code = ?ritem.protoCode()
|
||||
if code == codec:
|
||||
let arg = ?ritem.protoAddress()
|
||||
return ok(arg)
|
||||
|
||||
err("Multiaddress codec has not been found")
|
||||
|
||||
proc getWildcardMultiAddresses(
|
||||
interfaceAddresses: seq[InterfaceAddress], protocol: Protocol, port: Port
|
||||
): seq[MultiAddress] =
|
||||
var addresses: seq[MultiAddress]
|
||||
for ifaddr in interfaceAddresses:
|
||||
var address = ifaddr.host
|
||||
address.port = port
|
||||
MultiAddress.init(address, protocol).withValue(maddress):
|
||||
addresses.add(maddress)
|
||||
addresses
|
||||
|
||||
proc getWildcardAddress(
|
||||
maddress: MultiAddress,
|
||||
multiCodec: MultiCodec,
|
||||
anyAddr: openArray[uint8],
|
||||
addrFamily: AddressFamily,
|
||||
port: Port,
|
||||
networkInterfaceProvider: NetworkInterfaceProvider,
|
||||
): seq[MultiAddress] =
|
||||
var addresses: seq[MultiAddress]
|
||||
maddress.getProtocolArgument(multiCodec).withValue(address):
|
||||
if address == anyAddr:
|
||||
let filteredInterfaceAddresses = networkInterfaceProvider.getAddresses(addrFamily)
|
||||
addresses.add(
|
||||
getWildcardMultiAddresses(filteredInterfaceAddresses, IPPROTO_TCP, port)
|
||||
)
|
||||
else:
|
||||
addresses.add(maddress)
|
||||
return addresses
|
||||
|
||||
proc expandWildcardAddresses(
|
||||
networkInterfaceProvider: NetworkInterfaceProvider, listenAddrs: seq[MultiAddress]
|
||||
): seq[MultiAddress] =
|
||||
var addresses: seq[MultiAddress]
|
||||
# In this loop we expand bound addresses like `0.0.0.0` and `::` to list of interface addresses.
|
||||
for listenAddr in listenAddrs:
|
||||
if TCP_IP.matchPartial(listenAddr):
|
||||
listenAddr.getProtocolArgument(multiCodec("tcp")).withValue(portArg):
|
||||
let port = Port(uint16.fromBytesBE(portArg))
|
||||
if IP4.matchPartial(listenAddr):
|
||||
let wildcardAddresses = getWildcardAddress(
|
||||
listenAddr,
|
||||
multiCodec("ip4"),
|
||||
AnyAddress.address_v4,
|
||||
AddressFamily.IPv4,
|
||||
port,
|
||||
networkInterfaceProvider,
|
||||
)
|
||||
addresses.add(wildcardAddresses)
|
||||
elif IP6.matchPartial(listenAddr):
|
||||
let wildcardAddresses = getWildcardAddress(
|
||||
listenAddr,
|
||||
multiCodec("ip6"),
|
||||
AnyAddress6.address_v6,
|
||||
AddressFamily.IPv6,
|
||||
port,
|
||||
networkInterfaceProvider,
|
||||
)
|
||||
addresses.add(wildcardAddresses)
|
||||
else:
|
||||
addresses.add(listenAddr)
|
||||
else:
|
||||
addresses.add(listenAddr)
|
||||
addresses
|
||||
|
||||
method setup*(
|
||||
self: WildcardAddressResolverService, switch: Switch
|
||||
): Future[bool] {.async.} =
|
||||
## Sets up the `WildcardAddressResolverService`.
|
||||
##
|
||||
## This method adds the address mapper to the peer's list of address mappers.
|
||||
##
|
||||
## Parameters:
|
||||
## - `self`: The instance of `WildcardAddressResolverService` being set up.
|
||||
## - `switch`: The switch context in which the service operates.
|
||||
##
|
||||
## Returns:
|
||||
## - A `Future[bool]` that resolves to `true` if the setup was successful, otherwise `false`.
|
||||
self.addressMapper = proc(
|
||||
listenAddrs: seq[MultiAddress]
|
||||
): Future[seq[MultiAddress]] {.async.} =
|
||||
return expandWildcardAddresses(self.networkInterfaceProvider, listenAddrs)
|
||||
|
||||
debug "Setting up WildcardAddressResolverService"
|
||||
let hasBeenSetup = await procCall Service(self).setup(switch)
|
||||
if hasBeenSetup:
|
||||
switch.peerInfo.addressMappers.add(self.addressMapper)
|
||||
await self.run(switch)
|
||||
return hasBeenSetup
|
||||
|
||||
method run*(self: WildcardAddressResolverService, switch: Switch) {.async, public.} =
|
||||
## Runs the WildcardAddressResolverService for a given switch.
|
||||
##
|
||||
## It updates the peer information for the provided switch by running the registered address mapper. Any other
|
||||
## address mappers that are registered with the switch will also be run.
|
||||
##
|
||||
trace "Running WildcardAddressResolverService"
|
||||
await switch.peerInfo.update()
|
||||
|
||||
method stop*(
|
||||
self: WildcardAddressResolverService, switch: Switch
|
||||
): Future[bool] {.async, public.} =
|
||||
## Stops the WildcardAddressResolverService.
|
||||
##
|
||||
## Handles the shutdown process of the WildcardAddressResolverService for a given switch.
|
||||
## It removes the address mapper from the switch's list of address mappers.
|
||||
## It then updates the peer information for the provided switch. Any wildcard address wont be resolved anymore.
|
||||
##
|
||||
## Parameters:
|
||||
## - `self`: The instance of the WildcardAddressResolverService.
|
||||
## - `switch`: The Switch object associated with the service.
|
||||
##
|
||||
## Returns:
|
||||
## - A future that resolves to `true` if the service was successfully stopped, otherwise `false`.
|
||||
debug "Stopping WildcardAddressResolverService"
|
||||
let hasBeenStopped = await procCall Service(self).stop(switch)
|
||||
if hasBeenStopped:
|
||||
switch.peerInfo.addressMappers.keepItIf(it != self.addressMapper)
|
||||
await switch.peerInfo.update()
|
||||
return hasBeenStopped
|
||||
@@ -273,6 +273,7 @@ proc accept(s: Switch, transport: Transport) {.async.} = # noraises
|
||||
except CancelledError as exc:
|
||||
trace "releasing semaphore on cancellation"
|
||||
upgrades.release() # always release the slot
|
||||
return
|
||||
except CatchableError as exc:
|
||||
error "Exception in accept loop, exiting", exc = exc.msg
|
||||
upgrades.release() # always release the slot
|
||||
@@ -288,6 +289,12 @@ proc stop*(s: Switch) {.async, public.} =
|
||||
|
||||
s.started = false
|
||||
|
||||
try:
|
||||
# Stop accepting incoming connections
|
||||
await allFutures(s.acceptFuts.mapIt(it.cancelAndWait())).wait(1.seconds)
|
||||
except CatchableError as exc:
|
||||
debug "Cannot cancel accepts", error = exc.msg
|
||||
|
||||
for service in s.services:
|
||||
discard await service.stop(s)
|
||||
|
||||
@@ -302,18 +309,6 @@ proc stop*(s: Switch) {.async, public.} =
|
||||
except CatchableError as exc:
|
||||
warn "error cleaning up transports", msg = exc.msg
|
||||
|
||||
try:
|
||||
await allFutures(s.acceptFuts)
|
||||
.wait(1.seconds)
|
||||
except CatchableError as exc:
|
||||
trace "Exception while stopping accept loops", exc = exc.msg
|
||||
|
||||
# check that all futures were properly
|
||||
# stopped and otherwise cancel them
|
||||
for a in s.acceptFuts:
|
||||
if not a.finished:
|
||||
a.cancel()
|
||||
|
||||
for service in s.services:
|
||||
discard await service.stop(s)
|
||||
|
||||
|
||||
@@ -12,262 +12,327 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[sequtils]
|
||||
import stew/results
|
||||
import chronos, chronicles
|
||||
import transport,
|
||||
../errors,
|
||||
../wire,
|
||||
../multicodec,
|
||||
../connmanager,
|
||||
../multiaddress,
|
||||
../stream/connection,
|
||||
../stream/chronosstream,
|
||||
../upgrademngrs/upgrade,
|
||||
../utility
|
||||
import
|
||||
./transport,
|
||||
../wire,
|
||||
../multiaddress,
|
||||
../stream/connection,
|
||||
../stream/chronosstream,
|
||||
../upgrademngrs/upgrade,
|
||||
../utility
|
||||
|
||||
logScope:
|
||||
topics = "libp2p tcptransport"
|
||||
|
||||
export transport, results
|
||||
export transport, connection, upgrade
|
||||
|
||||
const
|
||||
TcpTransportTrackerName* = "libp2p.tcptransport"
|
||||
const TcpTransportTrackerName* = "libp2p.tcptransport"
|
||||
|
||||
type
|
||||
AcceptFuture = typeof(default(StreamServer).accept())
|
||||
|
||||
TcpTransport* = ref object of Transport
|
||||
servers*: seq[StreamServer]
|
||||
clients: array[Direction, seq[StreamTransport]]
|
||||
flags: set[ServerFlags]
|
||||
clientFlags: set[SocketFlags]
|
||||
acceptFuts: seq[Future[StreamTransport]]
|
||||
acceptFuts: seq[AcceptFuture]
|
||||
connectionsTimeout: Duration
|
||||
stopping: bool
|
||||
|
||||
TcpTransportError* = object of transport.TransportError
|
||||
|
||||
proc connHandler*(self: TcpTransport,
|
||||
client: StreamTransport,
|
||||
observedAddr: Opt[MultiAddress],
|
||||
dir: Direction): Future[Connection] {.async.} =
|
||||
|
||||
trace "Handling tcp connection", address = $observedAddr,
|
||||
dir = $dir,
|
||||
clients = self.clients[Direction.In].len +
|
||||
self.clients[Direction.Out].len
|
||||
proc connHandler*(
|
||||
self: TcpTransport,
|
||||
client: StreamTransport,
|
||||
observedAddr: Opt[MultiAddress],
|
||||
dir: Direction,
|
||||
): Connection =
|
||||
trace "Handling tcp connection",
|
||||
address = $observedAddr,
|
||||
dir = $dir,
|
||||
clients = self.clients[Direction.In].len + self.clients[Direction.Out].len
|
||||
|
||||
let conn = Connection(
|
||||
ChronosStream.init(
|
||||
client = client,
|
||||
dir = dir,
|
||||
observedAddr = observedAddr,
|
||||
timeout = self.connectionsTimeout
|
||||
))
|
||||
timeout = self.connectionsTimeout,
|
||||
)
|
||||
)
|
||||
|
||||
proc onClose() {.async: (raises: []).} =
|
||||
try:
|
||||
block:
|
||||
let
|
||||
fut1 = client.join()
|
||||
fut2 = conn.join()
|
||||
try: # https://github.com/status-im/nim-chronos/issues/516
|
||||
discard await race(fut1, fut2)
|
||||
except ValueError: raiseAssert("Futures list is not empty")
|
||||
# at least one join() completed, cancel pending one, if any
|
||||
if not fut1.finished: await fut1.cancelAndWait()
|
||||
if not fut2.finished: await fut2.cancelAndWait()
|
||||
await noCancel client.join()
|
||||
|
||||
trace "Cleaning up client", addrs = $client.remoteAddress,
|
||||
conn
|
||||
trace "Cleaning up client", addrs = $client.remoteAddress, conn
|
||||
|
||||
self.clients[dir].keepItIf( it != client )
|
||||
self.clients[dir].keepItIf(it != client)
|
||||
|
||||
block:
|
||||
let
|
||||
fut1 = conn.close()
|
||||
fut2 = client.closeWait()
|
||||
await allFutures(fut1, fut2)
|
||||
if fut1.failed:
|
||||
let err = fut1.error()
|
||||
debug "Error cleaning up client", errMsg = err.msg, conn
|
||||
static: doAssert typeof(fut2).E is void # Cannot fail
|
||||
# Propagate the chronos client being closed to the connection
|
||||
# TODO This is somewhat dubious since it's the connection that owns the
|
||||
# client, but it allows the transport to close all connections when
|
||||
# shutting down (also dubious! it would make more sense that the owner
|
||||
# of all connections closes them, or the next read detects the closed
|
||||
# socket and does the right thing..)
|
||||
|
||||
trace "Cleaned up client", addrs = $client.remoteAddress,
|
||||
conn
|
||||
await conn.close()
|
||||
|
||||
except CancelledError as exc:
|
||||
let useExc {.used.} = exc
|
||||
debug "Error cleaning up client", errMsg = exc.msg, conn
|
||||
trace "Cleaned up client", addrs = $client.remoteAddress, conn
|
||||
|
||||
self.clients[dir].add(client)
|
||||
|
||||
asyncSpawn onClose()
|
||||
|
||||
return conn
|
||||
|
||||
proc new*(
|
||||
T: typedesc[TcpTransport],
|
||||
flags: set[ServerFlags] = {},
|
||||
upgrade: Upgrade,
|
||||
connectionsTimeout = 10.minutes): T {.public.} =
|
||||
T: typedesc[TcpTransport],
|
||||
flags: set[ServerFlags] = {},
|
||||
upgrade: Upgrade,
|
||||
connectionsTimeout = 10.minutes,
|
||||
): T {.public.} =
|
||||
T(
|
||||
flags: flags,
|
||||
clientFlags:
|
||||
if ServerFlags.TcpNoDelay in flags:
|
||||
{SocketFlags.TcpNoDelay}
|
||||
else:
|
||||
default(set[SocketFlags])
|
||||
,
|
||||
upgrader: upgrade,
|
||||
networkReachability: NetworkReachability.Unknown,
|
||||
connectionsTimeout: connectionsTimeout,
|
||||
)
|
||||
|
||||
let
|
||||
transport = T(
|
||||
flags: flags,
|
||||
clientFlags:
|
||||
if ServerFlags.TcpNoDelay in flags:
|
||||
compilesOr:
|
||||
{SocketFlags.TcpNoDelay}
|
||||
do:
|
||||
doAssert(false)
|
||||
default(set[SocketFlags])
|
||||
else:
|
||||
default(set[SocketFlags]),
|
||||
upgrader: upgrade,
|
||||
networkReachability: NetworkReachability.Unknown,
|
||||
connectionsTimeout: connectionsTimeout)
|
||||
method start*(self: TcpTransport, addrs: seq[MultiAddress]): Future[void] =
|
||||
## Start transport listening to the given addresses - for dial-only transports,
|
||||
## start with an empty list
|
||||
|
||||
return transport
|
||||
# TODO remove `impl` indirection throughout when `raises` is added to base
|
||||
|
||||
method start*(
|
||||
self: TcpTransport,
|
||||
addrs: seq[MultiAddress]) {.async.} =
|
||||
## listen on the transport
|
||||
##
|
||||
proc impl(
|
||||
self: TcpTransport, addrs: seq[MultiAddress]
|
||||
): Future[void] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
if self.running:
|
||||
warn "TCP transport already running"
|
||||
return
|
||||
|
||||
if self.running:
|
||||
warn "TCP transport already running"
|
||||
return
|
||||
|
||||
await procCall Transport(self).start(addrs)
|
||||
trace "Starting TCP transport"
|
||||
trackCounter(TcpTransportTrackerName)
|
||||
|
||||
for i, ma in addrs:
|
||||
if not self.handles(ma):
|
||||
trace "Invalid address detected, skipping!", address = ma
|
||||
continue
|
||||
trace "Starting TCP transport"
|
||||
|
||||
self.flags.incl(ServerFlags.ReusePort)
|
||||
let server = createStreamServer(
|
||||
ma = ma,
|
||||
flags = self.flags,
|
||||
udata = self)
|
||||
|
||||
# always get the resolved address in case we're bound to 0.0.0.0:0
|
||||
self.addrs[i] = MultiAddress.init(
|
||||
server.sock.getLocalAddress()
|
||||
).tryGet()
|
||||
var supported: seq[MultiAddress]
|
||||
var initialized = false
|
||||
try:
|
||||
for i, ma in addrs:
|
||||
if not self.handles(ma):
|
||||
trace "Invalid address detected, skipping!", address = ma
|
||||
continue
|
||||
|
||||
self.servers &= server
|
||||
let
|
||||
ta = initTAddress(ma).expect("valid address per handles check above")
|
||||
server =
|
||||
try:
|
||||
createStreamServer(ta, flags = self.flags)
|
||||
except common.TransportError as exc:
|
||||
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
|
||||
|
||||
trace "Listening on", address = ma
|
||||
self.servers &= server
|
||||
|
||||
method stop*(self: TcpTransport) {.async.} =
|
||||
## stop the transport
|
||||
##
|
||||
try:
|
||||
trace "Listening on", address = ma
|
||||
supported.add(
|
||||
MultiAddress.init(server.sock.getLocalAddress()).expect(
|
||||
"Can init from local address"
|
||||
)
|
||||
)
|
||||
|
||||
initialized = true
|
||||
finally:
|
||||
if not initialized:
|
||||
# Clean up partial success on exception
|
||||
await noCancel allFutures(self.servers.mapIt(it.closeWait()))
|
||||
reset(self.servers)
|
||||
|
||||
try:
|
||||
await procCall Transport(self).start(supported)
|
||||
except CatchableError:
|
||||
raiseAssert "Base method does not raise"
|
||||
|
||||
trackCounter(TcpTransportTrackerName)
|
||||
|
||||
impl(self, addrs)
|
||||
|
||||
method stop*(self: TcpTransport): Future[void] =
|
||||
## Stop the transport and close all connections it created
|
||||
proc impl(self: TcpTransport) {.async: (raises: []).} =
|
||||
trace "Stopping TCP transport"
|
||||
self.stopping = true
|
||||
defer:
|
||||
self.stopping = false
|
||||
|
||||
checkFutures(
|
||||
await allFinished(
|
||||
self.clients[Direction.In].mapIt(it.closeWait()) &
|
||||
self.clients[Direction.Out].mapIt(it.closeWait())))
|
||||
if self.running:
|
||||
# Reset the running flag
|
||||
try:
|
||||
await noCancel procCall Transport(self).stop()
|
||||
except CatchableError: # TODO remove when `accept` is annotated with raises
|
||||
raiseAssert "doesn't actually raise"
|
||||
|
||||
if not self.running:
|
||||
# Stop each server by closing the socket - this will cause all accept loops
|
||||
# to fail - since the running flag has been reset, it's also safe to close
|
||||
# all known clients since no more of them will be added
|
||||
await noCancel allFutures(
|
||||
self.servers.mapIt(it.closeWait()) &
|
||||
self.clients[Direction.In].mapIt(it.closeWait()) &
|
||||
self.clients[Direction.Out].mapIt(it.closeWait())
|
||||
)
|
||||
|
||||
self.servers = @[]
|
||||
|
||||
for acceptFut in self.acceptFuts:
|
||||
if acceptFut.completed():
|
||||
await acceptFut.value().closeWait()
|
||||
self.acceptFuts = @[]
|
||||
|
||||
if self.clients[Direction.In].len != 0 or self.clients[Direction.Out].len != 0:
|
||||
# Future updates could consider turning this warn into an assert since
|
||||
# it should never happen if the shutdown code is correct
|
||||
warn "Couldn't clean up clients",
|
||||
len = self.clients[Direction.In].len + self.clients[Direction.Out].len
|
||||
|
||||
trace "Transport stopped"
|
||||
untrackCounter(TcpTransportTrackerName)
|
||||
else:
|
||||
# For legacy reasons, `stop` on a transpart that wasn't started is
|
||||
# expected to close outgoing connections created by the transport
|
||||
warn "TCP transport already stopped"
|
||||
return
|
||||
|
||||
await procCall Transport(self).stop() # call base
|
||||
var toWait: seq[Future[void]]
|
||||
for fut in self.acceptFuts:
|
||||
if not fut.finished:
|
||||
toWait.add(fut.cancelAndWait())
|
||||
elif fut.done:
|
||||
toWait.add(fut.read().closeWait())
|
||||
doAssert self.clients[Direction.In].len == 0,
|
||||
"No incoming connections possible without start"
|
||||
await noCancel allFutures(self.clients[Direction.Out].mapIt(it.closeWait()))
|
||||
|
||||
for server in self.servers:
|
||||
server.stop()
|
||||
toWait.add(server.closeWait())
|
||||
impl(self)
|
||||
|
||||
await allFutures(toWait)
|
||||
|
||||
self.servers = @[]
|
||||
self.acceptFuts = @[]
|
||||
|
||||
trace "Transport stopped"
|
||||
untrackCounter(TcpTransportTrackerName)
|
||||
except CatchableError as exc:
|
||||
trace "Error shutting down tcp transport", exc = exc.msg
|
||||
|
||||
method accept*(self: TcpTransport): Future[Connection] {.async.} =
|
||||
## accept a new TCP connection
|
||||
method accept*(self: TcpTransport): Future[Connection] =
|
||||
## accept a new TCP connection, returning nil on non-fatal errors
|
||||
##
|
||||
|
||||
if not self.running:
|
||||
raise newTransportClosedError()
|
||||
|
||||
try:
|
||||
if self.acceptFuts.len <= 0:
|
||||
self.acceptFuts = self.servers.mapIt(Future[StreamTransport](it.accept()))
|
||||
## Raises an exception when the transport is broken and cannot be used for
|
||||
## accepting further connections
|
||||
# TODO returning nil for non-fatal errors is problematic in that error
|
||||
# information is lost and must be logged here instead of being
|
||||
# available to the caller - further refactoring should propagate errors
|
||||
# to the caller instead
|
||||
proc impl(
|
||||
self: TcpTransport
|
||||
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
if not self.running:
|
||||
raise newTransportClosedError()
|
||||
|
||||
if self.acceptFuts.len <= 0:
|
||||
return
|
||||
self.acceptFuts = self.servers.mapIt(it.accept())
|
||||
|
||||
let
|
||||
finished = await one(self.acceptFuts)
|
||||
finished =
|
||||
try:
|
||||
await one(self.acceptFuts)
|
||||
except ValueError:
|
||||
raise (ref TcpTransportError)(msg: "No listeners configured")
|
||||
|
||||
index = self.acceptFuts.find(finished)
|
||||
transp =
|
||||
try:
|
||||
await finished
|
||||
except TransportTooManyError as exc:
|
||||
debug "Too many files opened", exc = exc.msg
|
||||
return nil
|
||||
except TransportAbortedError as exc:
|
||||
debug "Connection aborted", exc = exc.msg
|
||||
return nil
|
||||
except TransportUseClosedError as exc:
|
||||
raise newTransportClosedError(exc)
|
||||
except TransportOsError as exc:
|
||||
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
|
||||
except common.TransportError as exc: # Needed for chronos 4.0.0 support
|
||||
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
|
||||
if not self.running: # Stopped while waiting
|
||||
await transp.closeWait()
|
||||
raise newTransportClosedError()
|
||||
|
||||
self.acceptFuts[index] = self.servers[index].accept()
|
||||
|
||||
let transp = await finished
|
||||
try:
|
||||
let observedAddr = MultiAddress.init(transp.remoteAddress).tryGet()
|
||||
return await self.connHandler(transp, Opt.some(observedAddr), Direction.In)
|
||||
except CancelledError as exc:
|
||||
debug "CancelledError", exc = exc.msg
|
||||
transp.close()
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "Failed to handle connection", exc = exc.msg
|
||||
transp.close()
|
||||
except TransportTooManyError as exc:
|
||||
debug "Too many files opened", exc = exc.msg
|
||||
except TransportAbortedError as exc:
|
||||
debug "Connection aborted", exc = exc.msg
|
||||
except TransportUseClosedError as exc:
|
||||
debug "Server was closed", exc = exc.msg
|
||||
raise newTransportClosedError(exc)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except TransportOsError as exc:
|
||||
info "OS Error", exc = exc.msg
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
info "Unexpected error accepting connection", exc = exc.msg
|
||||
raise exc
|
||||
let remote =
|
||||
try:
|
||||
transp.remoteAddress
|
||||
except TransportOsError as exc:
|
||||
# The connection had errors / was closed before `await` returned control
|
||||
await transp.closeWait()
|
||||
debug "Cannot read remote address", exc = exc.msg
|
||||
return nil
|
||||
|
||||
let observedAddr =
|
||||
MultiAddress.init(remote).expect("Can initialize from remote address")
|
||||
self.connHandler(transp, Opt.some(observedAddr), Direction.In)
|
||||
|
||||
impl(self)
|
||||
|
||||
method dial*(
|
||||
self: TcpTransport,
|
||||
hostname: string,
|
||||
address: MultiAddress,
|
||||
peerId: Opt[PeerId] = Opt.none(PeerId)): Future[Connection] {.async.} =
|
||||
self: TcpTransport,
|
||||
hostname: string,
|
||||
address: MultiAddress,
|
||||
peerId: Opt[PeerId] = Opt.none(PeerId),
|
||||
): Future[Connection] =
|
||||
## dial a peer
|
||||
##
|
||||
proc impl(
|
||||
self: TcpTransport, hostname: string, address: MultiAddress, peerId: Opt[PeerId]
|
||||
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
|
||||
if self.stopping:
|
||||
raise newTransportClosedError()
|
||||
|
||||
trace "Dialing remote peer", address = $address
|
||||
let transp =
|
||||
if self.networkReachability == NetworkReachability.NotReachable and self.addrs.len > 0:
|
||||
self.clientFlags.incl(SocketFlags.ReusePort)
|
||||
await connect(address, flags = self.clientFlags, localAddress = Opt.some(self.addrs[0]))
|
||||
else:
|
||||
await connect(address, flags = self.clientFlags)
|
||||
let ta = initTAddress(address).valueOr:
|
||||
raise (ref TcpTransportError)(msg: "Unsupported address: " & $address)
|
||||
|
||||
try:
|
||||
let observedAddr = MultiAddress.init(transp.remoteAddress).tryGet()
|
||||
return await self.connHandler(transp, Opt.some(observedAddr), Direction.Out)
|
||||
except CatchableError as err:
|
||||
await transp.closeWait()
|
||||
raise err
|
||||
trace "Dialing remote peer", address = $address
|
||||
let transp =
|
||||
try:
|
||||
await(
|
||||
if self.networkReachability == NetworkReachability.NotReachable and
|
||||
self.addrs.len > 0:
|
||||
let local = initTAddress(self.addrs[0]).expect("self address is valid")
|
||||
self.clientFlags.incl(SocketFlags.ReusePort)
|
||||
connect(ta, flags = self.clientFlags, localAddress = local)
|
||||
else:
|
||||
connect(ta, flags = self.clientFlags)
|
||||
)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
|
||||
|
||||
method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} =
|
||||
# If `stop` is called after `connect` but before `await` returns, we might
|
||||
# end up with a race condition where `stop` returns but not all connections
|
||||
# have been closed - we drop connections in this case in order not to leak
|
||||
# them
|
||||
if self.stopping:
|
||||
# Stopped while waiting for new connection
|
||||
await transp.closeWait()
|
||||
raise newTransportClosedError()
|
||||
|
||||
let observedAddr =
|
||||
try:
|
||||
MultiAddress.init(transp.remoteAddress).expect("remote address is valid")
|
||||
except TransportOsError as exc:
|
||||
await transp.closeWait()
|
||||
raise (ref TcpTransportError)(msg: exc.msg)
|
||||
|
||||
self.connHandler(transp, Opt.some(observedAddr), Direction.Out)
|
||||
|
||||
impl(self, hostname, address, peerId)
|
||||
|
||||
method handles*(t: TcpTransport, address: MultiAddress): bool =
|
||||
if procCall Transport(t).handles(address):
|
||||
if address.protocols.isOk:
|
||||
return TCP.match(address)
|
||||
return TCP.match(address)
|
||||
@@ -200,7 +200,7 @@ method dial*(
|
||||
|
||||
try:
|
||||
await dialPeer(transp, address)
|
||||
return await self.tcpTransport.connHandler(transp, Opt.none(MultiAddress), Direction.Out)
|
||||
return self.tcpTransport.connHandler(transp, Opt.none(MultiAddress), Direction.Out)
|
||||
except CatchableError as err:
|
||||
await transp.closeWait()
|
||||
raise err
|
||||
|
||||
@@ -35,7 +35,7 @@ type
|
||||
upgrader*: Upgrade
|
||||
networkReachability*: NetworkReachability
|
||||
|
||||
proc newTransportClosedError*(parent: ref Exception = nil): ref LPError =
|
||||
proc newTransportClosedError*(parent: ref Exception = nil): ref TransportError =
|
||||
newException(TransportClosedError,
|
||||
"Transport closed, no more connections!", parent)
|
||||
|
||||
|
||||
@@ -112,6 +112,9 @@ template withValue*[T](self: Opt[T] | Option[T], value, body: untyped): untyped
|
||||
let value {.inject.} = temp.get()
|
||||
body
|
||||
|
||||
template withValue*[T, E](self: Result[T, E], value, body: untyped): untyped =
|
||||
self.toOpt().withValue(value, body)
|
||||
|
||||
macro withValue*[T](self: Opt[T] | Option[T], value, body, elseStmt: untyped): untyped =
|
||||
let elseBody = elseStmt[0]
|
||||
quote do:
|
||||
|
||||
@@ -13,6 +13,8 @@
|
||||
import chronos, stew/endians2
|
||||
import multiaddress, multicodec, errors, utility
|
||||
|
||||
export multiaddress, chronos
|
||||
|
||||
when defined(windows):
|
||||
import winlean
|
||||
else:
|
||||
@@ -30,7 +32,6 @@ const
|
||||
UDP,
|
||||
)
|
||||
|
||||
|
||||
proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] =
|
||||
## Initialize ``TransportAddress`` with MultiAddress ``ma``.
|
||||
##
|
||||
@@ -76,7 +77,7 @@ proc connect*(
|
||||
child: StreamTransport = nil,
|
||||
flags = default(set[SocketFlags]),
|
||||
localAddress: Opt[MultiAddress] = Opt.none(MultiAddress)): Future[StreamTransport]
|
||||
{.raises: [LPError, MaInvalidAddress].} =
|
||||
{.async.} =
|
||||
## Open new connection to remote peer with address ``ma`` and create
|
||||
## new transport object ``StreamTransport`` for established connection.
|
||||
## ``bufferSize`` is size of internal buffer for transport.
|
||||
@@ -88,12 +89,12 @@ proc connect*(
|
||||
let transportAddress = initTAddress(ma).tryGet()
|
||||
|
||||
compilesOr:
|
||||
return connect(transportAddress, bufferSize, child,
|
||||
return await connect(transportAddress, bufferSize, child,
|
||||
if localAddress.isSome(): initTAddress(localAddress.expect("just checked")).tryGet() else: TransportAddress(),
|
||||
flags)
|
||||
do:
|
||||
# support for older chronos versions
|
||||
return connect(transportAddress, bufferSize, child)
|
||||
return await connect(transportAddress, bufferSize, child)
|
||||
|
||||
proc createStreamServer*[T](ma: MultiAddress,
|
||||
cbproc: StreamCallback,
|
||||
|
||||
46
tests/di/testdi.nim
Normal file
46
tests/di/testdi.nim
Normal file
@@ -0,0 +1,46 @@
|
||||
{.used.}
|
||||
|
||||
# Nim-Libp2p
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import ../helpers
|
||||
import ../../di/di
|
||||
|
||||
type
|
||||
MyInterface = ref object of RootObj
|
||||
MyImplementation = ref object of MyInterface
|
||||
AnotherImplementation = ref object of MyInterface
|
||||
|
||||
MyObject = object
|
||||
|
||||
method doSomething(obj: MyInterface) {.base.} = discard
|
||||
|
||||
method doSomething(obj: MyImplementation) =
|
||||
echo "MyImplementation doing something!"
|
||||
|
||||
method doSomething(obj: AnotherImplementation) =
|
||||
echo "AnotherImplementation doing something!"
|
||||
|
||||
proc provideMyImplementation(): MyInterface =
|
||||
MyImplementation()
|
||||
|
||||
proc provideAnotherImplementation(): MyInterface =
|
||||
AnotherImplementation()
|
||||
|
||||
suite "DI":
|
||||
|
||||
asyncTest "DI":
|
||||
let container = Container()
|
||||
register[MyInterface](container, provideMyImplementation, "myImplementation")
|
||||
register[MyInterface](container, provideAnotherImplementation, "anotherImplementation")
|
||||
|
||||
let myImplementation = resolve[MyInterface](container, "anotherImplementation")
|
||||
myImplementation.doSomething()
|
||||
@@ -569,8 +569,8 @@ suite "GossipSub":
|
||||
proc slowValidator(topic: string, message: Message): Future[ValidationResult] {.async.} =
|
||||
await cRelayed
|
||||
# Empty A & C caches to detect duplicates
|
||||
gossip1.seen = TimedCache[MessageId].init()
|
||||
gossip3.seen = TimedCache[MessageId].init()
|
||||
gossip1.seen = TimedCache[SaltedId].init()
|
||||
gossip3.seen = TimedCache[SaltedId].init()
|
||||
let msgId = toSeq(gossip2.validationSeen.keys)[0]
|
||||
checkUntilTimeout(try: gossip2.validationSeen[msgId].len > 0 except: false)
|
||||
result = ValidationResult.Accept
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
{.used.}
|
||||
|
||||
import unittest2, options, sets, sequtils
|
||||
import unittest2, sequtils
|
||||
import stew/byteutils
|
||||
import ../../libp2p/[peerid,
|
||||
crypto/crypto,
|
||||
protocols/pubsub/mcache,
|
||||
protocols/pubsub/rpc/messages]
|
||||
import ./utils
|
||||
protocols/pubsub/rpc/message]
|
||||
|
||||
var rng = newRng()
|
||||
|
||||
|
||||
@@ -24,6 +24,8 @@ suite "TimedCache":
|
||||
2 in cache
|
||||
3 in cache
|
||||
|
||||
cache.addedAt(2) == now + 3.seconds
|
||||
|
||||
check:
|
||||
cache.put(2, now + 7.seconds) # refreshes 2
|
||||
not cache.put(4, now + 12.seconds) # expires 3
|
||||
@@ -33,6 +35,23 @@ suite "TimedCache":
|
||||
3 notin cache
|
||||
4 in cache
|
||||
|
||||
check:
|
||||
cache.del(4).isSome()
|
||||
4 notin cache
|
||||
|
||||
check:
|
||||
not cache.put(100, now + 100.seconds) # expires everything
|
||||
100 in cache
|
||||
2 notin cache
|
||||
|
||||
test "enough items to force cache heap storage growth":
|
||||
var cache = TimedCache[int].init(5.seconds)
|
||||
|
||||
let now = Moment.now()
|
||||
for i in 101..100000:
|
||||
check:
|
||||
not cache.put(i, now)
|
||||
|
||||
for i in 101..100000:
|
||||
check:
|
||||
i in cache
|
||||
|
||||
@@ -315,7 +315,6 @@ suite "Circuit Relay V2":
|
||||
await sleepAsync(chronos.timer.seconds(ttl + 1))
|
||||
|
||||
expect(DialFailedError):
|
||||
check: conn.atEof()
|
||||
await conn.close()
|
||||
await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
|
||||
conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec)
|
||||
|
||||
86
tests/testwildcardresolverservice.nim
Normal file
86
tests/testwildcardresolverservice.nim
Normal file
@@ -0,0 +1,86 @@
|
||||
{.used.}
|
||||
|
||||
# Nim-Libp2p
|
||||
# Copyright (c) 2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
import std/[options, sequtils]
|
||||
import stew/[byteutils]
|
||||
import chronos, metrics
|
||||
import unittest2
|
||||
import ../libp2p/[builders, switch]
|
||||
import ../libp2p/services/wildcardresolverservice
|
||||
import ../libp2p/[multiaddress, multicodec]
|
||||
import ./helpers
|
||||
import ../di/di
|
||||
|
||||
type NetworkInterfaceProviderMock* = ref object of NetworkInterfaceProvider
|
||||
|
||||
method getAddresses(
|
||||
networkInterfaceProvider: NetworkInterfaceProviderMock, addrFamily: AddressFamily
|
||||
): seq[InterfaceAddress] {.gcsafe, raises: [].} =
|
||||
echo "getAddressesMock"
|
||||
try:
|
||||
if addrFamily == AddressFamily.IPv4:
|
||||
return
|
||||
@[
|
||||
InterfaceAddress.init(initTAddress("127.0.0.1:0"), 8),
|
||||
InterfaceAddress.init(initTAddress("192.168.1.22:0"), 24),
|
||||
]
|
||||
else:
|
||||
return
|
||||
@[
|
||||
InterfaceAddress.init(initTAddress("::1:0"), 8),
|
||||
InterfaceAddress.init(initTAddress("fe80::1:0"), 64),
|
||||
]
|
||||
except TransportAddressError as e:
|
||||
echo "Error: " & $e.msg
|
||||
fail()
|
||||
|
||||
proc networkInterfaceProviderMock(): NetworkInterfaceProvider =
|
||||
NetworkInterfaceProviderMock.new()
|
||||
|
||||
proc createSwitch(): Switch =
|
||||
SwitchBuilder
|
||||
.new()
|
||||
.withRng(newRng())
|
||||
.withAddresses(
|
||||
@[
|
||||
MultiAddress.init("/ip4/0.0.0.0/tcp/0/").tryGet(),
|
||||
MultiAddress.init("/ip6/::/tcp/0/").tryGet(),
|
||||
]
|
||||
)
|
||||
.withTcpTransport()
|
||||
.withMplex()
|
||||
.withNoise()
|
||||
.withBinding(networkInterfaceProviderMock)
|
||||
.build()
|
||||
|
||||
suite "WildcardAddressResolverService":
|
||||
teardown:
|
||||
checkTrackers()
|
||||
|
||||
asyncTest "WildcardAddressResolverService must resolve wildcard addresses and stop doing so when stopped":
|
||||
let switch = createSwitch()
|
||||
await switch.start()
|
||||
let tcpIp4 = switch.peerInfo.addrs[0][multiCodec("tcp")].get # tcp port for ip4
|
||||
let tcpIp6 = switch.peerInfo.addrs[2][multiCodec("tcp")].get # tcp port for ip6
|
||||
|
||||
check switch.peerInfo.addrs ==
|
||||
@[
|
||||
MultiAddress.init("/ip4/127.0.0.1" & $tcpIp4).get,
|
||||
MultiAddress.init("/ip4/192.168.1.22" & $tcpIp4).get,
|
||||
MultiAddress.init("/ip6/::1" & $tcpIp6).get,
|
||||
MultiAddress.init("/ip6/fe80::1" & $tcpIp6).get,
|
||||
]
|
||||
await switch.stop()
|
||||
check switch.peerInfo.addrs ==
|
||||
@[
|
||||
MultiAddress.init("/ip4/0.0.0.0" & $tcpIp4).get,
|
||||
MultiAddress.init("/ip6/::" & $tcpIp6).get,
|
||||
]
|
||||
@@ -377,3 +377,24 @@ suite "Yamux":
|
||||
expect LPStreamClosedError: discard await streamA.readLp(100)
|
||||
blocker.complete()
|
||||
await streamA.close()
|
||||
|
||||
asyncTest "Peer must be able to read from stream after closing it for writing":
|
||||
mSetup()
|
||||
|
||||
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
|
||||
try:
|
||||
check (await conn.readLp(100)) == fromHex("1234")
|
||||
except CancelledError, LPStreamError:
|
||||
return
|
||||
try:
|
||||
await conn.writeLp(fromHex("5678"))
|
||||
except CancelledError, LPStreamError:
|
||||
return
|
||||
await conn.close()
|
||||
|
||||
let streamA = await yamuxa.newStream()
|
||||
check streamA == yamuxa.getStreams()[0]
|
||||
|
||||
await streamA.writeLp(fromHex("1234"))
|
||||
await streamA.close()
|
||||
check (await streamA.readLp(100)) == fromHex("5678")
|
||||
|
||||
@@ -11,6 +11,6 @@ COPY . nim-libp2p/
|
||||
|
||||
RUN \
|
||||
cd nim-libp2p && \
|
||||
nim c --skipProjCfg --skipParentCfg --NimblePath:./nimbledeps/pkgs -p:nim-libp2p -d:chronicles_log_level=WARN --threads:off ./tests/transport-interop/main.nim
|
||||
nim c --skipProjCfg --skipParentCfg --NimblePath:./nimbledeps/pkgs -p:nim-libp2p -d:chronicles_log_level=WARN -d:chronicles_default_output_device=stderr --threads:off ./tests/transport-interop/main.nim
|
||||
|
||||
ENTRYPOINT ["/app/nim-libp2p/tests/transport-interop/main"]
|
||||
|
||||
Reference in New Issue
Block a user