Compare commits

...

29 Commits

Author SHA1 Message Date
Diego
fdff1ebec5 use di to inject network interface provider 2024-05-26 22:24:14 +02:00
Diego
d34575675c create di 2024-05-26 22:23:25 +02:00
diegomrsantos
03f72a8b5c enable resolver by default 2024-05-26 18:29:34 +02:00
diegomrsantos
b11e2b0349 Update libp2p/services/wildcardresolverservice.nim
Co-authored-by: Ludovic Chenut <ludovic@status.im>
2024-05-24 14:13:35 +02:00
diegomrsantos
2fa2c4425f fix(yamux): set EoF when remote peer half closes the stream in yamux (#1086) 2024-05-24 14:11:27 +02:00
Diego
e228981a11 remove echo 2024-05-23 17:10:37 +02:00
Diego
907c41a491 remove toOpt 2024-05-23 17:09:56 +02:00
Diego
03668a3e90 formatting 2024-05-23 17:07:18 +02:00
Diego
980950b147 make addr provider a proc 2024-05-23 17:06:25 +02:00
Diego
eb6a9f2ff1 add docs 2024-05-22 15:28:38 +02:00
Diego
7810dba9d6 remove redundant test 2024-05-17 14:58:26 +02:00
Diego
6d327c37c4 remove peer id concat 2024-05-17 14:51:04 +02:00
Diego
21cb13a8ff remove unnecessary check 2024-05-17 01:21:11 +02:00
Diego
1b2b009f79 move peer id concatenation to switch 2024-05-17 01:19:23 +02:00
Diego
3a659ffddb removes scheduler and some cleanup 2024-05-16 19:20:30 +02:00
Diego
ac994a8f15 add doc comments 2024-05-16 18:39:58 +02:00
Diego
ee8318ec42 add doc comments 2024-05-16 16:08:33 +02:00
diegomrsantos
52a8870f78 Merge branch 'master' into anyaddr-resolver 2024-05-15 20:53:59 +02:00
kaiserd
0911cb20f4 chore(gossipsub): cleanups (#1096) 2024-05-15 18:57:15 +02:00
Diego
d7c0486968 add withvalue for result 2024-05-15 16:14:37 +02:00
Diego
63b6390d1a add test 2024-05-15 16:14:23 +02:00
Diego
cf7b77bf82 add wildcard addr resolver 2024-05-15 16:14:18 +02:00
Jacek Sieka
3ca49a2f40 fix(transport): various tcp transport races (#1095)
Co-authored-by: diegomrsantos <diego@status.im>
2024-05-14 07:10:34 +02:00
diegomrsantos
1b91b97499 fix(CI): rename branch from unstable to master in bumper workflow (#1097) 2024-05-10 15:42:43 +02:00
Jacek Sieka
21cbe3a91a chore: cleanups (#1092)
* remove cruft
* remove redundant error handling (reduces warnings)
* remove redundant copying
2024-05-08 14:33:26 +02:00
diegomrsantos
88e233db81 fix: Asynchronous task [sendMsgSlow()] was cancelled [FutureDefect] (#1094) 2024-05-07 15:44:14 +02:00
Jacek Sieka
84659af45b avoid latency/copy when sending low-priority messages to fast peers (#1060) 2024-05-02 12:26:16 +02:00
Jacek Sieka
aef44ed1ce salt idontwant (#1090) 2024-05-02 12:18:55 +02:00
Jacek Sieka
02c96fc003 Improve memory efficiency of seen cache (#1073) 2024-05-01 18:38:24 +02:00
36 changed files with 934 additions and 394 deletions

View File

@@ -2,8 +2,7 @@ name: Bumper
on:
push:
branches:
- unstable
- bumper
- master
workflow_dispatch:
jobs:

View File

@@ -7,7 +7,6 @@ if dirExists("nimbledeps/pkgs2"):
switch("warning", "CaseTransition:off")
switch("warning", "ObservableStores:off")
switch("warning", "LockLevel:off")
--define:chronosStrictException
--styleCheck:usages
switch("warningAsError", "UseBase:on")
--styleCheck:error

24
di/di.nim Normal file
View File

@@ -0,0 +1,24 @@
import typetraits
import tables
type
BindingKey = tuple[typeName: string, qualifier: string]
Container* = ref object
bindings*: Table[BindingKey, proc(): RootRef {.gcsafe, raises: [].}]
BindingNotFoundError* = object of CatchableError
proc register*[T](c: Container, implementation: proc(): T {.gcsafe, raises: [].}, qualifier: string = "") =
let key: BindingKey = (name(T), qualifier)
proc p(): RootRef =
let o: RootRef = implementation()
return o
c.bindings[key] = p
proc resolve*[T](c: Container, qualifier: string = ""): T {.raises: [BindingNotFoundError]} =
let key: BindingKey = (name(T), qualifier)
try:
return cast[T](c.bindings[key]())
except KeyError:
raise newException(BindingNotFoundError, "Type not bound: " & name(T))

View File

@@ -19,7 +19,8 @@ runnableExamples:
{.push raises: [].}
import
options, tables, chronos, chronicles, sequtils,
options, tables, chronos, chronicles, sequtils
import
switch, peerid, peerinfo, stream/connection, multiaddress,
crypto/crypto, transports/[transport, tcptransport],
muxers/[muxer, mplex/mplex, yamux/yamux],
@@ -28,6 +29,8 @@ import
connmanager, upgrademngrs/muxedupgrade, observedaddrmanager,
nameresolving/nameresolver,
errors, utility
import services/wildcardresolverservice
import ../di/di
export
switch, peerid, peerinfo, connection, multiaddress, crypto, errors
@@ -59,6 +62,8 @@ type
rdv: RendezVous
services: seq[Service]
observedAddrManager: ObservedAddrManager
enableWildcardResolver: bool
container*: Container
proc new*(T: type[SwitchBuilder]): T {.public.} =
## Creates a SwitchBuilder
@@ -67,7 +72,7 @@ proc new*(T: type[SwitchBuilder]): T {.public.} =
.init("/ip4/127.0.0.1/tcp/0")
.expect("Should initialize to default")
SwitchBuilder(
let sb = SwitchBuilder(
privKey: none(PrivateKey),
addresses: @[address],
secureManagers: @[],
@@ -76,7 +81,12 @@ proc new*(T: type[SwitchBuilder]): T {.public.} =
maxOut: -1,
maxConnsPerPeer: MaxConnectionsPerPeer,
protoVersion: ProtoVersion,
agentVersion: AgentVersion)
agentVersion: AgentVersion,
container: Container())
register[NetworkInterfaceProvider](sb.container, networkInterfaceProvider)
sb
proc withPrivateKey*(b: SwitchBuilder, privateKey: PrivateKey): SwitchBuilder {.public.} =
## Set the private key of the switch. Will be used to
@@ -85,20 +95,19 @@ proc withPrivateKey*(b: SwitchBuilder, privateKey: PrivateKey): SwitchBuilder {.
b.privKey = some(privateKey)
b
proc withAddress*(b: SwitchBuilder, address: MultiAddress): SwitchBuilder {.public.} =
## | Set the listening address of the switch
## | Calling it multiple time will override the value
b.addresses = @[address]
b
proc withAddresses*(b: SwitchBuilder, addresses: seq[MultiAddress]): SwitchBuilder {.public.} =
proc withAddresses*(b: SwitchBuilder, addresses: seq[MultiAddress], enableWildcardResolver: bool = true): SwitchBuilder {.public.} =
## | Set the listening addresses of the switch
## | Calling it multiple time will override the value
b.addresses = addresses
b.enableWildcardResolver = enableWildcardResolver
b
proc withAddress*(b: SwitchBuilder, address: MultiAddress, enableWildcardResolver: bool = true): SwitchBuilder {.public.} =
## | Set the listening address of the switch
## | Calling it multiple time will override the value
b.withAddresses(@[address], enableWildcardResolver)
proc withSignedPeerRecord*(b: SwitchBuilder, sendIt = true): SwitchBuilder {.public.} =
b.sendSignedPeerRecord = sendIt
b
@@ -209,6 +218,10 @@ proc withObservedAddrManager*(b: SwitchBuilder, observedAddrManager: ObservedAdd
b.observedAddrManager = observedAddrManager
b
proc withBinding*[T](b: SwitchBuilder, binding: proc(): T {.gcsafe, raises: [].}): SwitchBuilder =
register[T](b.container, binding)
b
proc build*(b: SwitchBuilder): Switch
{.raises: [LPError], public.} =
@@ -261,6 +274,12 @@ proc build*(b: SwitchBuilder): Switch
else:
PeerStore.new(identify)
try:
let networkInterfaceProvider = resolve[NetworkInterfaceProvider](b.container)
b.services.add(WildcardAddressResolverService.new(networkInterfaceProvider))
except BindingNotFoundError as e:
raise newException(LPError, "Cannot resolve NetworkInterfaceProvider", e)
let switch = newSwitch(
peerInfo = peerInfo,
transports = transports,
@@ -312,7 +331,7 @@ proc newStandardSwitch*(
let addrs = when addrs is MultiAddress: @[addrs] else: addrs
var b = SwitchBuilder
.new()
.withAddresses(addrs)
.withAddresses(addrs, true)
.withRng(rng)
.withSignedPeerRecord(sendSignedPeerRecord)
.withMaxConnections(maxConnections)

View File

@@ -81,16 +81,18 @@ proc dialAndUpgrade(
if dialed.dir != dir:
dialed.dir = dir
await transport.upgrade(dialed, peerId)
except CancelledError as exc:
await dialed.close()
raise exc
except CatchableError as exc:
# If we failed to establish the connection through one transport,
# we won't succeeded through another - no use in trying again
await dialed.close()
debug "Connection upgrade failed", err = exc.msg, peerId = peerId.get(default(PeerId))
if exc isnot CancelledError:
if dialed.dir == Direction.Out:
libp2p_failed_upgrades_outgoing.inc()
else:
libp2p_failed_upgrades_incoming.inc()
if dialed.dir == Direction.Out:
libp2p_failed_upgrades_outgoing.inc()
else:
libp2p_failed_upgrades_incoming.inc()
# Try other address
return nil

View File

@@ -44,12 +44,3 @@ macro checkFutures*[F](futs: seq[F], exclude: untyped = []): untyped =
# We still don't abort but warn
debug "A future has failed, enable trace logging for details", error=exc.name
trace "Exception details", msg=exc.msg
template tryAndWarn*(message: static[string]; body: untyped): untyped =
try:
body
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "An exception has ocurred, enable trace logging for details", name = exc.name, msg = message
trace "Exception details", exc = exc.msg

View File

@@ -13,12 +13,12 @@
{.push public.}
import pkg/chronos, chronicles
import std/[nativesockets, hashes]
import tables, strutils, sets, stew/shims/net
import std/[nativesockets, net, hashes]
import tables, strutils, sets
import multicodec, multihash, multibase, transcoder, vbuffer, peerid,
protobuf/minprotobuf, errors, utility
import stew/[base58, base32, endians2, results]
export results, minprotobuf, vbuffer, utility
export results, minprotobuf, vbuffer, utility, multicodec
logScope:
topics = "libp2p multiaddress"

View File

@@ -266,6 +266,7 @@ proc addHandler*[E](
proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} =
# Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([])`
# TODO https://github.com/nim-lang/Nim/issues/23445
var futs = newSeqOfCap[Future[void].Raising([CancelledError])](m.handlers.len)
for it in m.handlers:
futs.add it.protocol.start()
@@ -278,7 +279,7 @@ proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} =
doAssert m.handlers.len == futs.len, "Handlers modified while starting"
for i, fut in futs:
if not fut.finished:
pending.add noCancel fut.cancelAndWait()
pending.add fut.cancelAndWait()
elif fut.completed:
pending.add m.handlers[i].protocol.stop()
else:
@@ -286,7 +287,6 @@ proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} =
await noCancel allFutures(pending)
raise exc
proc stop*(m: MultistreamSelect) {.async: (raises: []).} =
# Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([CancelledError])`
var futs = newSeqOfCap[Future[void].Raising([])](m.handlers.len)

View File

@@ -164,7 +164,6 @@ type
closedRemotely: Future[void].Raising([])
closedLocally: bool
receivedData: AsyncEvent
returnedEof: bool
proc `$`(channel: YamuxChannel): string =
result = if channel.conn.dir == Out: "=> " else: "<= "
@@ -204,8 +203,8 @@ proc remoteClosed(channel: YamuxChannel) {.async: (raises: []).} =
method closeImpl*(channel: YamuxChannel) {.async: (raises: []).} =
if not channel.closedLocally:
trace "Closing yamux channel locally", streamId = channel.id, conn = channel.conn
channel.closedLocally = true
channel.isEof = true
if not channel.isReset and channel.sendQueue.len == 0:
try: await channel.conn.write(YamuxHeader.data(channel.id, 0, {Fin}))
@@ -273,7 +272,7 @@ method readOnce*(
newLPStreamClosedError()
else:
newLPStreamConnDownError()
if channel.returnedEof:
if channel.isEof:
raise newLPStreamRemoteClosedError()
if channel.recvQueue.len == 0:
channel.receivedData.clear()
@@ -281,9 +280,8 @@ method readOnce*(
discard await race(channel.closedRemotely, channel.receivedData.wait())
except ValueError: raiseAssert("Futures list is not empty")
if channel.closedRemotely.completed() and channel.recvQueue.len == 0:
channel.returnedEof = true
channel.isEof = true
return 0
return 0 # we return 0 to indicate that the channel is closed for reading from now on
let toRead = min(channel.recvQueue.len, nbytes)

View File

@@ -24,12 +24,16 @@ type
AddressMapper* =
proc(listenAddrs: seq[MultiAddress]): Future[seq[MultiAddress]]
{.gcsafe, raises: [].}
## A proc that expected to resolve the listen addresses into dialable addresses
PeerInfo* {.public.} = ref object
peerId*: PeerId
listenAddrs*: seq[MultiAddress]
## contains addresses the node listens on, which may include wildcard and private addresses (not directly reachable).
addrs: seq[MultiAddress]
## contains resolved addresses that other peers can use to connect, including public-facing NAT and port-forwarded addresses.
addressMappers*: seq[AddressMapper]
## contains a list of procs that can be used to resolve the listen addresses into dialable addresses.
protocols*: seq[string]
protoVersion*: string
agentVersion*: string

View File

@@ -56,7 +56,7 @@ method init*(p: Ping) =
trace "handling ping", conn
var buf: array[PingSize, byte]
await conn.readExactly(addr buf[0], PingSize)
trace "echoing ping", conn
trace "echoing ping", conn, pingData = @buf
await conn.write(@buf)
if not isNil(p.pingHandler):
await p.pingHandler(conn.peerId)

View File

@@ -16,6 +16,7 @@ import ./pubsub,
./timedcache,
./peertable,
./rpc/[message, messages, protobuf],
nimcrypto/[hash, sha2],
../../crypto/crypto,
../../stream/connection,
../../peerid,
@@ -32,25 +33,34 @@ const FloodSubCodec* = "/floodsub/1.0.0"
type
FloodSub* {.public.} = ref object of PubSub
floodsub*: PeerTable # topic to remote peer map
seen*: TimedCache[MessageId] # message id:s already seen on the network
seenSalt*: seq[byte]
seen*: TimedCache[SaltedId]
# Early filter for messages recently observed on the network
# We use a salted id because the messages in this cache have not yet
# been validated meaning that an attacker has greater control over the
# hash key and therefore could poison the table
seenSalt*: sha256
# The salt in this case is a partially updated SHA256 context pre-seeded
# with some random data
proc hasSeen*(f: FloodSub, msgId: MessageId): bool =
f.seenSalt & msgId in f.seen
proc salt*(f: FloodSub, msgId: MessageId): SaltedId =
var tmp = f.seenSalt
tmp.update(msgId)
SaltedId(data: tmp.finish())
proc addSeen*(f: FloodSub, msgId: MessageId): bool =
# Salting the seen hash helps avoid attacks against the hash function used
# in the nim hash table
proc hasSeen*(f: FloodSub, saltedId: SaltedId): bool =
saltedId in f.seen
proc addSeen*(f: FloodSub, saltedId: SaltedId): bool =
# Return true if the message has already been seen
f.seen.put(f.seenSalt & msgId)
f.seen.put(saltedId)
proc firstSeen*(f: FloodSub, msgId: MessageId): Moment =
f.seen.addedAt(f.seenSalt & msgId)
proc firstSeen*(f: FloodSub, saltedId: SaltedId): Moment =
f.seen.addedAt(saltedId)
proc handleSubscribe*(f: FloodSub,
peer: PubSubPeer,
topic: string,
subscribe: bool) =
proc handleSubscribe(f: FloodSub,
peer: PubSubPeer,
topic: string,
subscribe: bool) =
logScope:
peer
topic
@@ -96,10 +106,9 @@ method unsubscribePeer*(f: FloodSub, peer: PeerId) =
method rpcHandler*(f: FloodSub,
peer: PubSubPeer,
data: seq[byte]) {.async.} =
var rpcMsg = decodeRpcMsg(data).valueOr:
debug "failed to decode msg from peer", peer, err = error
raise newException(CatchableError, "")
raise newException(CatchableError, "Peer msg couldn't be decoded")
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
# trigger hooks
@@ -117,9 +126,11 @@ method rpcHandler*(f: FloodSub,
# TODO: descore peers due to error during message validation (malicious?)
continue
let msgId = msgIdResult.get
let
msgId = msgIdResult.get
saltedId = f.salt(msgId)
if f.addSeen(msgId):
if f.addSeen(saltedId):
trace "Dropping already-seen message", msgId, peer
continue
@@ -216,7 +227,7 @@ method publish*(f: FloodSub,
trace "Created new message",
msg = shortLog(msg), peers = peers.len, topic, msgId
if f.addSeen(msgId):
if f.addSeen(f.salt(msgId)):
# custom msgid providers might cause this
trace "Dropping already-seen message", msgId, topic
return 0
@@ -234,8 +245,11 @@ method publish*(f: FloodSub,
method initPubSub*(f: FloodSub)
{.raises: [InitializationError].} =
procCall PubSub(f).initPubSub()
f.seen = TimedCache[MessageId].init(2.minutes)
f.seenSalt = newSeqUninitialized[byte](sizeof(Hash))
hmacDrbgGenerate(f.rng[], f.seenSalt)
f.seen = TimedCache[SaltedId].init(2.minutes)
f.seenSalt.init()
var tmp: array[32, byte]
hmacDrbgGenerate(f.rng[], tmp)
f.seenSalt.update(tmp)
f.init()

View File

@@ -266,10 +266,10 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
procCall FloodSub(g).unsubscribePeer(peer)
proc handleSubscribe*(g: GossipSub,
peer: PubSubPeer,
topic: string,
subscribe: bool) =
proc handleSubscribe(g: GossipSub,
peer: PubSubPeer,
topic: string,
subscribe: bool) =
logScope:
peer
topic
@@ -360,13 +360,13 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
proc validateAndRelay(g: GossipSub,
msg: Message,
msgId, msgIdSalted: MessageId,
msgId: MessageId, saltedId: SaltedId,
peer: PubSubPeer) {.async.} =
try:
let validation = await g.validate(msg)
var seenPeers: HashSet[PubSubPeer]
discard g.validationSeen.pop(msgIdSalted, seenPeers)
discard g.validationSeen.pop(saltedId, seenPeers)
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
libp2p_gossipsub_saved_bytes.inc((msg.data.len * seenPeers.len).int64, labelValues = ["validation_duplicate"])
@@ -395,9 +395,7 @@ proc validateAndRelay(g: GossipSub,
g.floodsub.withValue(topic, peers): toSendPeers.incl(peers[])
g.mesh.withValue(topic, peers): toSendPeers.incl(peers[])
# add direct peers
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(topic))
g.subscribedDirectPeers.withValue(topic, peers): toSendPeers.incl(peers[])
# Don't send it to source peer, or peers that
# sent it during validation
@@ -413,14 +411,13 @@ proc validateAndRelay(g: GossipSub,
for peer in toSendPeers:
for heDontWant in peer.heDontWants:
if msgId in heDontWant:
if saltedId in heDontWant:
seenPeers.incl(peer)
libp2p_gossipsub_idontwant_saved_messages.inc
libp2p_gossipsub_saved_bytes.inc(msg.data.len.int64, labelValues = ["idontwant"])
break
toSendPeers.excl(seenPeers)
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
@@ -469,6 +466,11 @@ method rpcHandler*(g: GossipSub,
var rpcMsg = decodeRpcMsg(data).valueOr:
debug "failed to decode msg from peer", peer, err = error
await rateLimit(g, peer, msgSize)
# Raising in the handler closes the gossipsub connection (but doesn't
# disconnect the peer!)
# TODO evaluate behaviour penalty values
peer.behaviourPenalty += 0.1
raise newException(CatchableError, "Peer msg couldn't be decoded")
when defined(libp2p_expensive_metrics):
@@ -478,12 +480,13 @@ method rpcHandler*(g: GossipSub,
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
await rateLimit(g, peer, g.messageOverhead(rpcMsg, msgSize))
# trigger hooks
# trigger hooks - these may modify the message
peer.recvObservers(rpcMsg)
if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
g.send(peer, RPCMsg(pong: rpcMsg.ping), isHighPriority = true)
peer.pingBudget.dec
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i]
g.handleSubscribe(peer, sub.topic, sub.subscribe)
@@ -503,17 +506,15 @@ method rpcHandler*(g: GossipSub,
if msgIdResult.isErr:
debug "Dropping message due to failed message id generation",
error = msgIdResult.error
# TODO: descore peers due to error during message validation (malicious?)
await g.punishInvalidMessage(peer, msg)
continue
let
msgId = msgIdResult.get
msgIdSalted = msgId & g.seenSalt
msgIdSalted = g.salt(msgId)
topic = msg.topic
# addSeen adds salt to msgId to avoid
# remote attacking the hash function
if g.addSeen(msgId):
if g.addSeen(msgIdSalted):
trace "Dropping already-seen message", msgId = shortLog(msgId), peer
var alreadyReceived = false
@@ -523,7 +524,7 @@ method rpcHandler*(g: GossipSub,
alreadyReceived = true
if not alreadyReceived:
let delay = Moment.now() - g.firstSeen(msgId)
let delay = Moment.now() - g.firstSeen(msgIdSalted)
g.rewardDelivered(peer, topic, false, delay)
libp2p_gossipsub_duplicate.inc()
@@ -600,25 +601,24 @@ method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
g.mesh.del(topic)
# Send unsubscribe (in reverse order to sub/graft)
procCall PubSub(g).onTopicSubscription(topic, subscribed)
method publish*(g: GossipSub,
topic: string,
data: seq[byte]): Future[int] {.async.} =
# base returns always 0
discard await procCall PubSub(g).publish(topic, data)
logScope:
topic
trace "Publishing message on topic", data = data.shortLog
if topic.len <= 0: # data could be 0/empty
debug "Empty topic, skipping publish"
return 0
# base returns always 0
discard await procCall PubSub(g).publish(topic, data)
trace "Publishing message on topic", data = data.shortLog
var peers: HashSet[PubSubPeer]
# add always direct peers
@@ -631,38 +631,39 @@ method publish*(g: GossipSub,
# With flood publishing enabled, the mesh is used when propagating messages from other peers,
# but a peer's own messages will always be published to all known peers in the topic, limited
# to the amount of peers we can send it to in one heartbeat
var maxPeersToFlodOpt: Opt[int64]
if g.parameters.bandwidthEstimatebps > 0:
let
bandwidth = (g.parameters.bandwidthEstimatebps) div 8 div 1000 # Divisions are to convert it to Bytes per ms TODO replace with bandwidth estimate
msToTransmit = max(data.len div bandwidth, 1)
maxPeersToFlodOpt = Opt.some(max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow))
let maxPeersToFlood =
if g.parameters.bandwidthEstimatebps > 0:
let
bandwidth = (g.parameters.bandwidthEstimatebps) div 8 div 1000 # Divisions are to convert it to Bytes per ms TODO replace with bandwidth estimate
msToTransmit = max(data.len div bandwidth, 1)
max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow)
else:
int.high() # unlimited
for peer in g.gossipsub.getOrDefault(topic):
maxPeersToFlodOpt.withValue(maxPeersToFlod):
if peers.len >= maxPeersToFlod: break
if peers.len >= maxPeersToFlood: break
if peer.score >= g.parameters.publishThreshold:
trace "publish: including flood/high score peer", peer
peers.incl(peer)
if peers.len < g.parameters.dLow:
# not subscribed, or bad mesh, send to fanout peers
var fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
if fanoutPeers.len < g.parameters.dLow:
g.replenishFanout(topic)
fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
elif peers.len < g.parameters.dLow:
# not subscribed or bad mesh, send to fanout peers
# when flood-publishing, fanout won't help since all potential peers have
# already been added
g.replenishFanout(topic) # Make sure fanout is populated
var fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
g.rng.shuffle(fanoutPeers)
for fanPeer in fanoutPeers:
peers.incl(fanPeer)
if peers.len > g.parameters.d: break
# even if we couldn't publish,
# we still attempted to publish
# on the topic, so it makes sense
# to update the last topic publish
# time
# Attempting to publish counts as fanout send (even if the message
# ultimately is not sent)
g.lastFanoutPubSub[topic] = Moment.fromNow(g.parameters.fanoutTTL)
if peers.len == 0:
@@ -690,8 +691,10 @@ method publish*(g: GossipSub,
trace "Created new message", msg = shortLog(msg), peers = peers.len
if g.addSeen(msgId):
# custom msgid providers might cause this
if g.addSeen(g.salt(msgId)):
# If the message was received or published recently, don't re-publish it -
# this might happen when not using sequence id:s and / or with a custom
# message id provider
trace "Dropping already-seen message"
return 0
@@ -779,7 +782,7 @@ method initPubSub*(g: GossipSub)
raise newException(InitializationError, $validationRes.error)
# init the floodsub stuff here, we customize timedcache in gossip!
g.seen = TimedCache[MessageId].init(g.parameters.seenTTL)
g.seen = TimedCache[SaltedId].init(g.parameters.seenTTL)
# init gossip stuff
g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength)

View File

@@ -30,7 +30,7 @@ declareGauge(libp2p_gossipsub_healthy_peers_topics, "number of topics in mesh wi
declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"])
declareGauge(libp2p_gossipsub_received_iwants, "received iwants", labels = ["kind"])
proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [].} =
proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) =
g.withPeerStats(p.peerId) do (stats: var PeerStats):
var info = stats.topicInfos.getOrDefault(topic)
info.graftTime = Moment.now()
@@ -46,7 +46,7 @@ proc pruned*(g: GossipSub,
p: PubSubPeer,
topic: string,
setBackoff: bool = true,
backoff = none(Duration)) {.raises: [].} =
backoff = none(Duration)) =
if setBackoff:
let
backoffDuration = backoff.get(g.parameters.pruneBackoff)
@@ -70,7 +70,7 @@ proc pruned*(g: GossipSub,
trace "pruned", peer=p, topic
proc handleBackingOff*(t: var BackoffTable, topic: string) {.raises: [].} =
proc handleBackingOff*(t: var BackoffTable, topic: string) =
let now = Moment.now()
var expired = toSeq(t.getOrDefault(topic).pairs())
expired.keepIf do (pair: tuple[peer: PeerId, expire: Moment]) -> bool:
@@ -79,7 +79,7 @@ proc handleBackingOff*(t: var BackoffTable, topic: string) {.raises: [].} =
t.withValue(topic, v):
v[].del(peer)
proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises: [].} =
proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] =
if not g.parameters.enablePX:
return @[]
var peers = g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()).toSeq()
@@ -100,7 +100,7 @@ proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises:
proc handleGraft*(g: GossipSub,
peer: PubSubPeer,
grafts: seq[ControlGraft]): seq[ControlPrune] = # {.raises: [Defect].} TODO chronicles exception on windows
grafts: seq[ControlGraft]): seq[ControlPrune] =
var prunes: seq[ControlPrune]
for graft in grafts:
let topic = graft.topicID
@@ -204,8 +204,7 @@ proc getPeers(prune: ControlPrune, peer: PubSubPeer): seq[(PeerId, Option[PeerRe
routingRecords
proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [].} =
proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
for prune in prunes:
let topic = prune.topicID
@@ -239,7 +238,7 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.r
proc handleIHave*(g: GossipSub,
peer: PubSubPeer,
ihaves: seq[ControlIHave]): ControlIWant {.raises: [].} =
ihaves: seq[ControlIHave]): ControlIWant =
var res: ControlIWant
if peer.score < g.parameters.gossipThreshold:
trace "ihave: ignoring low score peer", peer, score = peer.score
@@ -251,7 +250,7 @@ proc handleIHave*(g: GossipSub,
peer, topicID = ihave.topicID, msgs = ihave.messageIDs
if ihave.topicID in g.topics:
for msgId in ihave.messageIDs:
if not g.hasSeen(msgId):
if not g.hasSeen(g.salt(msgId)):
if peer.iHaveBudget <= 0:
break
elif msgId notin res.messageIDs:
@@ -269,12 +268,11 @@ proc handleIDontWant*(g: GossipSub,
for dontWant in iDontWants:
for messageId in dontWant.messageIDs:
if peer.heDontWants[^1].len > 1000: break
if messageId.len > 100: continue
peer.heDontWants[^1].incl(messageId)
peer.heDontWants[^1].incl(g.salt(messageId))
proc handleIWant*(g: GossipSub,
peer: PubSubPeer,
iwants: seq[ControlIWant]): seq[Message] {.raises: [].} =
iwants: seq[ControlIWant]): seq[Message] =
var
messages: seq[Message]
invalidRequests = 0
@@ -300,7 +298,7 @@ proc handleIWant*(g: GossipSub,
messages.add(msg)
return messages
proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} =
proc commitMetrics(metrics: var MeshMetrics) =
libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics)
libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics)
libp2p_gossipsub_under_dout_topics.set(metrics.underDoutTopics)
@@ -309,7 +307,7 @@ proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} =
libp2p_gossipsub_peers_per_topic_fanout.set(metrics.otherPeersPerTopicFanout, labelValues = ["other"])
libp2p_gossipsub_peers_per_topic_mesh.set(metrics.otherPeersPerTopicMesh, labelValues = ["other"])
proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) {.raises: [].} =
proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) =
logScope:
topic
mesh = g.mesh.peers(topic)
@@ -539,7 +537,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(prunes, prune, isHighPriority = true)
proc dropFanoutPeers*(g: GossipSub) {.raises: [].} =
proc dropFanoutPeers*(g: GossipSub) =
# drop peers that we haven't published to in
# GossipSubFanoutTTL seconds
let now = Moment.now()
@@ -552,7 +550,7 @@ proc dropFanoutPeers*(g: GossipSub) {.raises: [].} =
for topic in drops:
g.lastFanoutPubSub.del topic
proc replenishFanout*(g: GossipSub, topic: string) {.raises: [].} =
proc replenishFanout*(g: GossipSub, topic: string) =
## get fanout peers for a topic
logScope: topic
trace "about to replenish fanout"
@@ -568,7 +566,7 @@ proc replenishFanout*(g: GossipSub, topic: string) {.raises: [].} =
trace "fanout replenished with peers", peers = g.fanout.peers(topic)
proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: [].} =
proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] =
## gossip iHave messages to peers
##
@@ -612,26 +610,25 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
x notin gossipPeers and
x.score >= g.parameters.gossipThreshold
var target = g.parameters.dLazy
let factor = (g.parameters.gossipFactor.float * allPeers.len.float).int
if factor > target:
target = min(factor, allPeers.len)
# https://github.com/libp2p/specs/blob/98c5aa9421703fc31b0833ad8860a55db15be063/pubsub/gossipsub/gossipsub-v1.1.md#adaptive-gossip-dissemination
let
factor = (g.parameters.gossipFactor.float * allPeers.len.float).int
target = max(g.parameters.dLazy, factor)
if target < allPeers.len:
g.rng.shuffle(allPeers)
allPeers.setLen(target)
let msgIdsAsSet = ihave.messageIDs.toHashSet()
for peer in allPeers:
control.mgetOrPut(peer, ControlMessage()).ihave.add(ihave)
peer.sentIHaves[^1].incl(msgIdsAsSet)
for msgId in ihave.messageIDs:
peer.sentIHaves[^1].incl(msgId)
libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64)
return control
proc onHeartbeat(g: GossipSub) {.raises: [].} =
proc onHeartbeat(g: GossipSub) =
# reset IWANT budget
# reset IHAVE cap
block:
@@ -639,7 +636,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
peer.sentIHaves.addFirst(default(HashSet[MessageId]))
if peer.sentIHaves.len > g.parameters.historyLength:
discard peer.sentIHaves.popLast()
peer.heDontWants.addFirst(default(HashSet[MessageId]))
peer.heDontWants.addFirst(default(HashSet[SaltedId]))
if peer.heDontWants.len > g.parameters.historyLength:
discard peer.heDontWants.popLast()
peer.iHaveBudget = IHavePeerBudget
@@ -695,8 +692,6 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
g.mcache.shift() # shift the cache
# {.pop.} # raises []
proc heartbeat*(g: GossipSub) {.async.} =
heartbeat "GossipSub", g.parameters.heartbeatInterval:
trace "running heartbeat", instance = cast[int](g)

View File

@@ -87,8 +87,6 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 =
else:
0.0
{.pop.}
proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} =
try:
await g.switch.disconnect(peer.peerId)

View File

@@ -156,7 +156,7 @@ type
maxNumElementsInNonPriorityQueue*: int
BackoffTable* = Table[string, Table[PeerId, Moment]]
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]
ValidationSeenTable* = Table[SaltedId, HashSet[PubSubPeer]]
RoutingRecordsPair* = tuple[id: PeerId, record: Option[PeerRecord]]
RoutingRecordsHandler* =
@@ -172,8 +172,6 @@ type
subscribedDirectPeers*: PeerTable # directpeers that we keep alive
backingOff*: BackoffTable # peers to backoff from when replenishing the mesh
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
gossip*: Table[string, seq[ControlIHave]] # pending gossip
control*: Table[string, ControlMessage] # pending control messages
mcache*: MCache # messages cache
validationSeen*: ValidationSeenTable # peers who sent us message in validation
heartbeatFut*: Future[void] # cancellation future for heartbeat interval

View File

@@ -9,50 +9,57 @@
{.push raises: [].}
import std/[sets, tables, options]
import std/[sets, tables]
import rpc/[messages]
import results
export sets, tables, messages, options
export sets, tables, messages, results
type
CacheEntry* = object
mid*: MessageId
msgId*: MessageId
topic*: string
MCache* = object of RootObj
msgs*: Table[MessageId, Message]
history*: seq[seq[CacheEntry]]
pos*: int
windowSize*: Natural
func get*(c: MCache, mid: MessageId): Option[Message] =
if mid in c.msgs:
try: some(c.msgs[mid])
func get*(c: MCache, msgId: MessageId): Opt[Message] =
if msgId in c.msgs:
try: Opt.some(c.msgs[msgId])
except KeyError: raiseAssert "checked"
else:
none(Message)
Opt.none(Message)
func contains*(c: MCache, mid: MessageId): bool =
mid in c.msgs
func contains*(c: MCache, msgId: MessageId): bool =
msgId in c.msgs
func put*(c: var MCache, msgId: MessageId, msg: Message) =
if not c.msgs.hasKeyOrPut(msgId, msg):
# Only add cache entry if the message was not already in the cache
c.history[0].add(CacheEntry(mid: msgId, topic: msg.topic))
c.history[c.pos].add(CacheEntry(msgId: msgId, topic: msg.topic))
func window*(c: MCache, topic: string): HashSet[MessageId] =
let
len = min(c.windowSize, c.history.len)
for i in 0..<len:
for entry in c.history[i]:
# Work backwards from `pos` in the circular buffer
for entry in c.history[(c.pos + c.history.len - i) mod c.history.len]:
if entry.topic == topic:
result.incl(entry.mid)
result.incl(entry.msgId)
func shift*(c: var MCache) =
for entry in c.history.pop():
c.msgs.del(entry.mid)
# Shift circular buffer to write to a new position, clearing it from past
# iterations
c.pos = (c.pos + 1) mod c.history.len
c.history.insert(@[])
for entry in c.history[c.pos]:
c.msgs.del(entry.msgId)
reset(c.history[c.pos])
func init*(T: type MCache, window, history: Natural): T =
T(

View File

@@ -30,7 +30,6 @@ import ./errors as pubsub_errors,
../../errors,
../../utility
import metrics
import stew/results
export results

View File

@@ -80,7 +80,10 @@ type
score*: float64
sentIHaves*: Deque[HashSet[MessageId]]
heDontWants*: Deque[HashSet[MessageId]]
heDontWants*: Deque[HashSet[SaltedId]]
## IDONTWANT contains unvalidated message id:s which may be long and/or
## expensive to look up, so we apply the same salting to them as during
## unvalidated message processing
iHaveBudget*: int
pingBudget*: int
maxMessageSize: int
@@ -301,7 +304,7 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} =
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed
await p.connectedFut
discard await race(p.connectedFut)
var conn = p.sendConn
if conn == nil or conn.closed():
@@ -336,14 +339,21 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v
## priority messages have been sent.
doAssert(not isNil(p), "pubsubpeer nil!")
p.clearSendPriorityQueue()
# When queues are empty, skipping the non-priority queue for low priority
# messages reduces latency
let emptyQueues =
(p.rpcmessagequeue.sendPriorityQueue.len() +
p.rpcmessagequeue.nonPriorityQueue.len()) == 0
if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
Future[void].completed()
elif msg.len > p.maxMessageSize:
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
Future[void].completed()
elif isHighPriority:
p.clearSendPriorityQueue()
elif isHighPriority or emptyQueues:
let f = p.sendMsg(msg)
if not f.finished:
p.rpcmessagequeue.sendPriorityQueue.addLast(f)
@@ -504,5 +514,5 @@ proc new*(
maxNumElementsInNonPriorityQueue: maxNumElementsInNonPriorityQueue
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.heDontWants.addFirst(default(HashSet[MessageId]))
result.heDontWants.addFirst(default(HashSet[SaltedId]))
result.startSendNonPriorityTask()

View File

@@ -9,8 +9,8 @@
{.push raises: [].}
import options, sequtils, sugar
import "../../.."/[
import options, sequtils
import ../../../[
peerid,
routing_record,
utility
@@ -37,6 +37,12 @@ type
MessageId* = seq[byte]
SaltedId* = object
# Salted hash of message ID - used instead of the ordinary message ID to
# avoid hash poisoning attacks and to make memory usage more predictable
# with respect to the variable-length message id
data*: MDigest[256]
Message* = object
fromPeer*: PeerId
data*: seq[byte]

View File

@@ -9,12 +9,13 @@
{.push raises: [].}
import std/[tables]
import std/[hashes, sets]
import chronos/timer, stew/results
import ../../utility
export results
const Timeout* = 10.seconds # default timeout in ms
type
@@ -26,20 +27,38 @@ type
TimedCache*[K] = object of RootObj
head, tail: TimedEntry[K] # nim linked list doesn't allow inserting at pos
entries: Table[K, TimedEntry[K]]
entries: HashSet[TimedEntry[K]]
timeout: Duration
func `==`*[E](a, b: TimedEntry[E]): bool =
if isNil(a) == isNil(b):
isNil(a) or a.key == b.key
else:
false
func hash*(a: TimedEntry): Hash =
if isNil(a):
default(Hash)
else:
hash(a[].key)
func expire*(t: var TimedCache, now: Moment = Moment.now()) =
while t.head != nil and t.head.expiresAt < now:
t.entries.del(t.head.key)
t.entries.excl(t.head)
t.head.prev = nil
t.head = t.head.next
if t.head == nil: t.tail = nil
func del*[K](t: var TimedCache[K], key: K): Opt[TimedEntry[K]] =
# Removes existing key from cache, returning the previous value if present
var item: TimedEntry[K]
if t.entries.pop(key, item):
let tmp = TimedEntry[K](key: key)
if tmp in t.entries:
let item = try:
t.entries[tmp] # use the shared instance in the set
except KeyError:
raiseAssert "just checked"
t.entries.excl(item)
if t.head == item: t.head = item.next
if t.tail == item: t.tail = item.prev
@@ -55,14 +74,14 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
# refreshed.
t.expire(now)
var previous = t.del(k) # Refresh existing item
var addedAt = now
previous.withValue(previous):
addedAt = previous.addedAt
let
previous = t.del(k) # Refresh existing item
addedAt = if previous.isSome():
previous[].addedAt
else:
now
let node = TimedEntry[K](key: k, addedAt: addedAt, expiresAt: now + t.timeout)
if t.head == nil:
t.tail = node
t.head = t.tail
@@ -83,16 +102,24 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
if cur == t.tail:
t.tail = node
t.entries[k] = node
t.entries.incl(node)
previous.isSome()
func contains*[K](t: TimedCache[K], k: K): bool =
k in t.entries
let tmp = TimedEntry[K](key: k)
tmp in t.entries
func addedAt*[K](t: TimedCache[K], k: K): Moment =
t.entries.getOrDefault(k).addedAt
func addedAt*[K](t: var TimedCache[K], k: K): Moment =
let tmp = TimedEntry[K](key: k)
try:
if tmp in t.entries: # raising is slow
# Use shared instance from entries
return t.entries[tmp][].addedAt
except KeyError:
raiseAssert "just checked"
default(Moment)
func init*[K](T: type TimedCache[K], timeout: Duration = Timeout): T =
T(

View File

@@ -0,0 +1,213 @@
# Nim-LibP2P
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [].}
import std/sequtils
import stew/[byteutils, results, endians2]
import chronos, chronos/transports/[osnet, ipnet], chronicles
import ../[multiaddress, multicodec]
import ../switch
logScope:
topics = "libp2p wildcardresolverservice"
type
WildcardAddressResolverService* = ref object of Service
## Service used to resolve wildcard addresses of the type "0.0.0.0" for IPv4 or "::" for IPv6.
## When used with a `Switch`, this service will be automatically set up and stopped
## when the switch starts and stops. This is facilitated by adding the service to the switch's
## list of services using the `.withServices(@[svc])` method in the `SwitchBuilder`.
networkInterfaceProvider: NetworkInterfaceProvider
## Provides a list of network addresses.
addressMapper: AddressMapper
## An implementation of an address mapper that takes a list of listen addresses and expands each wildcard address
## to the respective list of interface addresses. As an example, if the listen address is 0.0.0.0:4001
## and the machine has 2 interfaces with IPs 172.217.11.174 and 64.233.177.113, the address mapper will
## expand the wildcard address to 172.217.11.174:4001 and 64.233.177.113:4001.
NetworkInterfaceProvider* = ref object of RootObj
proc isLoopbackOrUp(networkInterface: NetworkInterface): bool =
if (networkInterface.ifType == IfSoftwareLoopback) or
(networkInterface.state == StatusUp): true else: false
proc networkInterfaceProvider*(): NetworkInterfaceProvider =
## Returns a new instance of `NetworkInterfaceProvider`.
return NetworkInterfaceProvider()
method getAddresses*(
networkInterfaceProvider: NetworkInterfaceProvider, addrFamily: AddressFamily
): seq[InterfaceAddress] {.base.} =
## This method retrieves the addresses of network interfaces based on the specified address family.
##
## The `getAddresses` method filters the available network interfaces to include only
## those that are either loopback or up. It then collects all the addresses from these
## interfaces and filters them to match the provided address family.
##
## Parameters:
## - `networkInterfaceProvider`: A provider that offers access to network interfaces.
## - `addrFamily`: The address family to filter the network addresses (e.g., `AddressFamily.IPv4` or `AddressFamily.IPv6`).
##
## Returns:
## - A sequence of `InterfaceAddress` objects that match the specified address family.
echo "Getting addresses for address family: ", addrFamily
let
interfaces = getInterfaces().filterIt(it.isLoopbackOrUp())
flatInterfaceAddresses = concat(interfaces.mapIt(it.addresses))
filteredInterfaceAddresses =
flatInterfaceAddresses.filterIt(it.host.family == addrFamily)
return filteredInterfaceAddresses
proc new*(
T: typedesc[WildcardAddressResolverService],
networkInterfaceProvider: NetworkInterfaceProvider = new(NetworkInterfaceProvider),
): T =
## This procedure initializes a new `WildcardAddressResolverService` with the provided network interface provider.
##
## Parameters:
## - `T`: The type descriptor for `WildcardAddressResolverService`.
## - `networkInterfaceProvider`: A provider that offers access to network interfaces. Defaults to a new instance of `NetworkInterfaceProvider`.
##
## Returns:
## - A new instance of `WildcardAddressResolverService`.
return T(networkInterfaceProvider: networkInterfaceProvider)
proc getProtocolArgument*(ma: MultiAddress, codec: MultiCodec): MaResult[seq[byte]] =
var buffer: seq[byte]
for item in ma:
let
ritem = ?item
code = ?ritem.protoCode()
if code == codec:
let arg = ?ritem.protoAddress()
return ok(arg)
err("Multiaddress codec has not been found")
proc getWildcardMultiAddresses(
interfaceAddresses: seq[InterfaceAddress], protocol: Protocol, port: Port
): seq[MultiAddress] =
var addresses: seq[MultiAddress]
for ifaddr in interfaceAddresses:
var address = ifaddr.host
address.port = port
MultiAddress.init(address, protocol).withValue(maddress):
addresses.add(maddress)
addresses
proc getWildcardAddress(
maddress: MultiAddress,
multiCodec: MultiCodec,
anyAddr: openArray[uint8],
addrFamily: AddressFamily,
port: Port,
networkInterfaceProvider: NetworkInterfaceProvider,
): seq[MultiAddress] =
var addresses: seq[MultiAddress]
maddress.getProtocolArgument(multiCodec).withValue(address):
if address == anyAddr:
let filteredInterfaceAddresses = networkInterfaceProvider.getAddresses(addrFamily)
addresses.add(
getWildcardMultiAddresses(filteredInterfaceAddresses, IPPROTO_TCP, port)
)
else:
addresses.add(maddress)
return addresses
proc expandWildcardAddresses(
networkInterfaceProvider: NetworkInterfaceProvider, listenAddrs: seq[MultiAddress]
): seq[MultiAddress] =
var addresses: seq[MultiAddress]
# In this loop we expand bound addresses like `0.0.0.0` and `::` to list of interface addresses.
for listenAddr in listenAddrs:
if TCP_IP.matchPartial(listenAddr):
listenAddr.getProtocolArgument(multiCodec("tcp")).withValue(portArg):
let port = Port(uint16.fromBytesBE(portArg))
if IP4.matchPartial(listenAddr):
let wildcardAddresses = getWildcardAddress(
listenAddr,
multiCodec("ip4"),
AnyAddress.address_v4,
AddressFamily.IPv4,
port,
networkInterfaceProvider,
)
addresses.add(wildcardAddresses)
elif IP6.matchPartial(listenAddr):
let wildcardAddresses = getWildcardAddress(
listenAddr,
multiCodec("ip6"),
AnyAddress6.address_v6,
AddressFamily.IPv6,
port,
networkInterfaceProvider,
)
addresses.add(wildcardAddresses)
else:
addresses.add(listenAddr)
else:
addresses.add(listenAddr)
addresses
method setup*(
self: WildcardAddressResolverService, switch: Switch
): Future[bool] {.async.} =
## Sets up the `WildcardAddressResolverService`.
##
## This method adds the address mapper to the peer's list of address mappers.
##
## Parameters:
## - `self`: The instance of `WildcardAddressResolverService` being set up.
## - `switch`: The switch context in which the service operates.
##
## Returns:
## - A `Future[bool]` that resolves to `true` if the setup was successful, otherwise `false`.
self.addressMapper = proc(
listenAddrs: seq[MultiAddress]
): Future[seq[MultiAddress]] {.async.} =
return expandWildcardAddresses(self.networkInterfaceProvider, listenAddrs)
debug "Setting up WildcardAddressResolverService"
let hasBeenSetup = await procCall Service(self).setup(switch)
if hasBeenSetup:
switch.peerInfo.addressMappers.add(self.addressMapper)
await self.run(switch)
return hasBeenSetup
method run*(self: WildcardAddressResolverService, switch: Switch) {.async, public.} =
## Runs the WildcardAddressResolverService for a given switch.
##
## It updates the peer information for the provided switch by running the registered address mapper. Any other
## address mappers that are registered with the switch will also be run.
##
trace "Running WildcardAddressResolverService"
await switch.peerInfo.update()
method stop*(
self: WildcardAddressResolverService, switch: Switch
): Future[bool] {.async, public.} =
## Stops the WildcardAddressResolverService.
##
## Handles the shutdown process of the WildcardAddressResolverService for a given switch.
## It removes the address mapper from the switch's list of address mappers.
## It then updates the peer information for the provided switch. Any wildcard address wont be resolved anymore.
##
## Parameters:
## - `self`: The instance of the WildcardAddressResolverService.
## - `switch`: The Switch object associated with the service.
##
## Returns:
## - A future that resolves to `true` if the service was successfully stopped, otherwise `false`.
debug "Stopping WildcardAddressResolverService"
let hasBeenStopped = await procCall Service(self).stop(switch)
if hasBeenStopped:
switch.peerInfo.addressMappers.keepItIf(it != self.addressMapper)
await switch.peerInfo.update()
return hasBeenStopped

View File

@@ -273,6 +273,7 @@ proc accept(s: Switch, transport: Transport) {.async.} = # noraises
except CancelledError as exc:
trace "releasing semaphore on cancellation"
upgrades.release() # always release the slot
return
except CatchableError as exc:
error "Exception in accept loop, exiting", exc = exc.msg
upgrades.release() # always release the slot
@@ -288,6 +289,12 @@ proc stop*(s: Switch) {.async, public.} =
s.started = false
try:
# Stop accepting incoming connections
await allFutures(s.acceptFuts.mapIt(it.cancelAndWait())).wait(1.seconds)
except CatchableError as exc:
debug "Cannot cancel accepts", error = exc.msg
for service in s.services:
discard await service.stop(s)
@@ -302,18 +309,6 @@ proc stop*(s: Switch) {.async, public.} =
except CatchableError as exc:
warn "error cleaning up transports", msg = exc.msg
try:
await allFutures(s.acceptFuts)
.wait(1.seconds)
except CatchableError as exc:
trace "Exception while stopping accept loops", exc = exc.msg
# check that all futures were properly
# stopped and otherwise cancel them
for a in s.acceptFuts:
if not a.finished:
a.cancel()
for service in s.services:
discard await service.stop(s)

View File

@@ -12,262 +12,327 @@
{.push raises: [].}
import std/[sequtils]
import stew/results
import chronos, chronicles
import transport,
../errors,
../wire,
../multicodec,
../connmanager,
../multiaddress,
../stream/connection,
../stream/chronosstream,
../upgrademngrs/upgrade,
../utility
import
./transport,
../wire,
../multiaddress,
../stream/connection,
../stream/chronosstream,
../upgrademngrs/upgrade,
../utility
logScope:
topics = "libp2p tcptransport"
export transport, results
export transport, connection, upgrade
const
TcpTransportTrackerName* = "libp2p.tcptransport"
const TcpTransportTrackerName* = "libp2p.tcptransport"
type
AcceptFuture = typeof(default(StreamServer).accept())
TcpTransport* = ref object of Transport
servers*: seq[StreamServer]
clients: array[Direction, seq[StreamTransport]]
flags: set[ServerFlags]
clientFlags: set[SocketFlags]
acceptFuts: seq[Future[StreamTransport]]
acceptFuts: seq[AcceptFuture]
connectionsTimeout: Duration
stopping: bool
TcpTransportError* = object of transport.TransportError
proc connHandler*(self: TcpTransport,
client: StreamTransport,
observedAddr: Opt[MultiAddress],
dir: Direction): Future[Connection] {.async.} =
trace "Handling tcp connection", address = $observedAddr,
dir = $dir,
clients = self.clients[Direction.In].len +
self.clients[Direction.Out].len
proc connHandler*(
self: TcpTransport,
client: StreamTransport,
observedAddr: Opt[MultiAddress],
dir: Direction,
): Connection =
trace "Handling tcp connection",
address = $observedAddr,
dir = $dir,
clients = self.clients[Direction.In].len + self.clients[Direction.Out].len
let conn = Connection(
ChronosStream.init(
client = client,
dir = dir,
observedAddr = observedAddr,
timeout = self.connectionsTimeout
))
timeout = self.connectionsTimeout,
)
)
proc onClose() {.async: (raises: []).} =
try:
block:
let
fut1 = client.join()
fut2 = conn.join()
try: # https://github.com/status-im/nim-chronos/issues/516
discard await race(fut1, fut2)
except ValueError: raiseAssert("Futures list is not empty")
# at least one join() completed, cancel pending one, if any
if not fut1.finished: await fut1.cancelAndWait()
if not fut2.finished: await fut2.cancelAndWait()
await noCancel client.join()
trace "Cleaning up client", addrs = $client.remoteAddress,
conn
trace "Cleaning up client", addrs = $client.remoteAddress, conn
self.clients[dir].keepItIf( it != client )
self.clients[dir].keepItIf(it != client)
block:
let
fut1 = conn.close()
fut2 = client.closeWait()
await allFutures(fut1, fut2)
if fut1.failed:
let err = fut1.error()
debug "Error cleaning up client", errMsg = err.msg, conn
static: doAssert typeof(fut2).E is void # Cannot fail
# Propagate the chronos client being closed to the connection
# TODO This is somewhat dubious since it's the connection that owns the
# client, but it allows the transport to close all connections when
# shutting down (also dubious! it would make more sense that the owner
# of all connections closes them, or the next read detects the closed
# socket and does the right thing..)
trace "Cleaned up client", addrs = $client.remoteAddress,
conn
await conn.close()
except CancelledError as exc:
let useExc {.used.} = exc
debug "Error cleaning up client", errMsg = exc.msg, conn
trace "Cleaned up client", addrs = $client.remoteAddress, conn
self.clients[dir].add(client)
asyncSpawn onClose()
return conn
proc new*(
T: typedesc[TcpTransport],
flags: set[ServerFlags] = {},
upgrade: Upgrade,
connectionsTimeout = 10.minutes): T {.public.} =
T: typedesc[TcpTransport],
flags: set[ServerFlags] = {},
upgrade: Upgrade,
connectionsTimeout = 10.minutes,
): T {.public.} =
T(
flags: flags,
clientFlags:
if ServerFlags.TcpNoDelay in flags:
{SocketFlags.TcpNoDelay}
else:
default(set[SocketFlags])
,
upgrader: upgrade,
networkReachability: NetworkReachability.Unknown,
connectionsTimeout: connectionsTimeout,
)
let
transport = T(
flags: flags,
clientFlags:
if ServerFlags.TcpNoDelay in flags:
compilesOr:
{SocketFlags.TcpNoDelay}
do:
doAssert(false)
default(set[SocketFlags])
else:
default(set[SocketFlags]),
upgrader: upgrade,
networkReachability: NetworkReachability.Unknown,
connectionsTimeout: connectionsTimeout)
method start*(self: TcpTransport, addrs: seq[MultiAddress]): Future[void] =
## Start transport listening to the given addresses - for dial-only transports,
## start with an empty list
return transport
# TODO remove `impl` indirection throughout when `raises` is added to base
method start*(
self: TcpTransport,
addrs: seq[MultiAddress]) {.async.} =
## listen on the transport
##
proc impl(
self: TcpTransport, addrs: seq[MultiAddress]
): Future[void] {.async: (raises: [transport.TransportError, CancelledError]).} =
if self.running:
warn "TCP transport already running"
return
if self.running:
warn "TCP transport already running"
return
await procCall Transport(self).start(addrs)
trace "Starting TCP transport"
trackCounter(TcpTransportTrackerName)
for i, ma in addrs:
if not self.handles(ma):
trace "Invalid address detected, skipping!", address = ma
continue
trace "Starting TCP transport"
self.flags.incl(ServerFlags.ReusePort)
let server = createStreamServer(
ma = ma,
flags = self.flags,
udata = self)
# always get the resolved address in case we're bound to 0.0.0.0:0
self.addrs[i] = MultiAddress.init(
server.sock.getLocalAddress()
).tryGet()
var supported: seq[MultiAddress]
var initialized = false
try:
for i, ma in addrs:
if not self.handles(ma):
trace "Invalid address detected, skipping!", address = ma
continue
self.servers &= server
let
ta = initTAddress(ma).expect("valid address per handles check above")
server =
try:
createStreamServer(ta, flags = self.flags)
except common.TransportError as exc:
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
trace "Listening on", address = ma
self.servers &= server
method stop*(self: TcpTransport) {.async.} =
## stop the transport
##
try:
trace "Listening on", address = ma
supported.add(
MultiAddress.init(server.sock.getLocalAddress()).expect(
"Can init from local address"
)
)
initialized = true
finally:
if not initialized:
# Clean up partial success on exception
await noCancel allFutures(self.servers.mapIt(it.closeWait()))
reset(self.servers)
try:
await procCall Transport(self).start(supported)
except CatchableError:
raiseAssert "Base method does not raise"
trackCounter(TcpTransportTrackerName)
impl(self, addrs)
method stop*(self: TcpTransport): Future[void] =
## Stop the transport and close all connections it created
proc impl(self: TcpTransport) {.async: (raises: []).} =
trace "Stopping TCP transport"
self.stopping = true
defer:
self.stopping = false
checkFutures(
await allFinished(
self.clients[Direction.In].mapIt(it.closeWait()) &
self.clients[Direction.Out].mapIt(it.closeWait())))
if self.running:
# Reset the running flag
try:
await noCancel procCall Transport(self).stop()
except CatchableError: # TODO remove when `accept` is annotated with raises
raiseAssert "doesn't actually raise"
if not self.running:
# Stop each server by closing the socket - this will cause all accept loops
# to fail - since the running flag has been reset, it's also safe to close
# all known clients since no more of them will be added
await noCancel allFutures(
self.servers.mapIt(it.closeWait()) &
self.clients[Direction.In].mapIt(it.closeWait()) &
self.clients[Direction.Out].mapIt(it.closeWait())
)
self.servers = @[]
for acceptFut in self.acceptFuts:
if acceptFut.completed():
await acceptFut.value().closeWait()
self.acceptFuts = @[]
if self.clients[Direction.In].len != 0 or self.clients[Direction.Out].len != 0:
# Future updates could consider turning this warn into an assert since
# it should never happen if the shutdown code is correct
warn "Couldn't clean up clients",
len = self.clients[Direction.In].len + self.clients[Direction.Out].len
trace "Transport stopped"
untrackCounter(TcpTransportTrackerName)
else:
# For legacy reasons, `stop` on a transpart that wasn't started is
# expected to close outgoing connections created by the transport
warn "TCP transport already stopped"
return
await procCall Transport(self).stop() # call base
var toWait: seq[Future[void]]
for fut in self.acceptFuts:
if not fut.finished:
toWait.add(fut.cancelAndWait())
elif fut.done:
toWait.add(fut.read().closeWait())
doAssert self.clients[Direction.In].len == 0,
"No incoming connections possible without start"
await noCancel allFutures(self.clients[Direction.Out].mapIt(it.closeWait()))
for server in self.servers:
server.stop()
toWait.add(server.closeWait())
impl(self)
await allFutures(toWait)
self.servers = @[]
self.acceptFuts = @[]
trace "Transport stopped"
untrackCounter(TcpTransportTrackerName)
except CatchableError as exc:
trace "Error shutting down tcp transport", exc = exc.msg
method accept*(self: TcpTransport): Future[Connection] {.async.} =
## accept a new TCP connection
method accept*(self: TcpTransport): Future[Connection] =
## accept a new TCP connection, returning nil on non-fatal errors
##
if not self.running:
raise newTransportClosedError()
try:
if self.acceptFuts.len <= 0:
self.acceptFuts = self.servers.mapIt(Future[StreamTransport](it.accept()))
## Raises an exception when the transport is broken and cannot be used for
## accepting further connections
# TODO returning nil for non-fatal errors is problematic in that error
# information is lost and must be logged here instead of being
# available to the caller - further refactoring should propagate errors
# to the caller instead
proc impl(
self: TcpTransport
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
if not self.running:
raise newTransportClosedError()
if self.acceptFuts.len <= 0:
return
self.acceptFuts = self.servers.mapIt(it.accept())
let
finished = await one(self.acceptFuts)
finished =
try:
await one(self.acceptFuts)
except ValueError:
raise (ref TcpTransportError)(msg: "No listeners configured")
index = self.acceptFuts.find(finished)
transp =
try:
await finished
except TransportTooManyError as exc:
debug "Too many files opened", exc = exc.msg
return nil
except TransportAbortedError as exc:
debug "Connection aborted", exc = exc.msg
return nil
except TransportUseClosedError as exc:
raise newTransportClosedError(exc)
except TransportOsError as exc:
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
except common.TransportError as exc: # Needed for chronos 4.0.0 support
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
except CancelledError as exc:
raise exc
if not self.running: # Stopped while waiting
await transp.closeWait()
raise newTransportClosedError()
self.acceptFuts[index] = self.servers[index].accept()
let transp = await finished
try:
let observedAddr = MultiAddress.init(transp.remoteAddress).tryGet()
return await self.connHandler(transp, Opt.some(observedAddr), Direction.In)
except CancelledError as exc:
debug "CancelledError", exc = exc.msg
transp.close()
raise exc
except CatchableError as exc:
debug "Failed to handle connection", exc = exc.msg
transp.close()
except TransportTooManyError as exc:
debug "Too many files opened", exc = exc.msg
except TransportAbortedError as exc:
debug "Connection aborted", exc = exc.msg
except TransportUseClosedError as exc:
debug "Server was closed", exc = exc.msg
raise newTransportClosedError(exc)
except CancelledError as exc:
raise exc
except TransportOsError as exc:
info "OS Error", exc = exc.msg
raise exc
except CatchableError as exc:
info "Unexpected error accepting connection", exc = exc.msg
raise exc
let remote =
try:
transp.remoteAddress
except TransportOsError as exc:
# The connection had errors / was closed before `await` returned control
await transp.closeWait()
debug "Cannot read remote address", exc = exc.msg
return nil
let observedAddr =
MultiAddress.init(remote).expect("Can initialize from remote address")
self.connHandler(transp, Opt.some(observedAddr), Direction.In)
impl(self)
method dial*(
self: TcpTransport,
hostname: string,
address: MultiAddress,
peerId: Opt[PeerId] = Opt.none(PeerId)): Future[Connection] {.async.} =
self: TcpTransport,
hostname: string,
address: MultiAddress,
peerId: Opt[PeerId] = Opt.none(PeerId),
): Future[Connection] =
## dial a peer
##
proc impl(
self: TcpTransport, hostname: string, address: MultiAddress, peerId: Opt[PeerId]
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
if self.stopping:
raise newTransportClosedError()
trace "Dialing remote peer", address = $address
let transp =
if self.networkReachability == NetworkReachability.NotReachable and self.addrs.len > 0:
self.clientFlags.incl(SocketFlags.ReusePort)
await connect(address, flags = self.clientFlags, localAddress = Opt.some(self.addrs[0]))
else:
await connect(address, flags = self.clientFlags)
let ta = initTAddress(address).valueOr:
raise (ref TcpTransportError)(msg: "Unsupported address: " & $address)
try:
let observedAddr = MultiAddress.init(transp.remoteAddress).tryGet()
return await self.connHandler(transp, Opt.some(observedAddr), Direction.Out)
except CatchableError as err:
await transp.closeWait()
raise err
trace "Dialing remote peer", address = $address
let transp =
try:
await(
if self.networkReachability == NetworkReachability.NotReachable and
self.addrs.len > 0:
let local = initTAddress(self.addrs[0]).expect("self address is valid")
self.clientFlags.incl(SocketFlags.ReusePort)
connect(ta, flags = self.clientFlags, localAddress = local)
else:
connect(ta, flags = self.clientFlags)
)
except CancelledError as exc:
raise exc
except CatchableError as exc:
raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} =
# If `stop` is called after `connect` but before `await` returns, we might
# end up with a race condition where `stop` returns but not all connections
# have been closed - we drop connections in this case in order not to leak
# them
if self.stopping:
# Stopped while waiting for new connection
await transp.closeWait()
raise newTransportClosedError()
let observedAddr =
try:
MultiAddress.init(transp.remoteAddress).expect("remote address is valid")
except TransportOsError as exc:
await transp.closeWait()
raise (ref TcpTransportError)(msg: exc.msg)
self.connHandler(transp, Opt.some(observedAddr), Direction.Out)
impl(self, hostname, address, peerId)
method handles*(t: TcpTransport, address: MultiAddress): bool =
if procCall Transport(t).handles(address):
if address.protocols.isOk:
return TCP.match(address)
return TCP.match(address)

View File

@@ -200,7 +200,7 @@ method dial*(
try:
await dialPeer(transp, address)
return await self.tcpTransport.connHandler(transp, Opt.none(MultiAddress), Direction.Out)
return self.tcpTransport.connHandler(transp, Opt.none(MultiAddress), Direction.Out)
except CatchableError as err:
await transp.closeWait()
raise err

View File

@@ -35,7 +35,7 @@ type
upgrader*: Upgrade
networkReachability*: NetworkReachability
proc newTransportClosedError*(parent: ref Exception = nil): ref LPError =
proc newTransportClosedError*(parent: ref Exception = nil): ref TransportError =
newException(TransportClosedError,
"Transport closed, no more connections!", parent)

View File

@@ -112,6 +112,9 @@ template withValue*[T](self: Opt[T] | Option[T], value, body: untyped): untyped
let value {.inject.} = temp.get()
body
template withValue*[T, E](self: Result[T, E], value, body: untyped): untyped =
self.toOpt().withValue(value, body)
macro withValue*[T](self: Opt[T] | Option[T], value, body, elseStmt: untyped): untyped =
let elseBody = elseStmt[0]
quote do:

View File

@@ -13,6 +13,8 @@
import chronos, stew/endians2
import multiaddress, multicodec, errors, utility
export multiaddress, chronos
when defined(windows):
import winlean
else:
@@ -30,7 +32,6 @@ const
UDP,
)
proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] =
## Initialize ``TransportAddress`` with MultiAddress ``ma``.
##
@@ -76,7 +77,7 @@ proc connect*(
child: StreamTransport = nil,
flags = default(set[SocketFlags]),
localAddress: Opt[MultiAddress] = Opt.none(MultiAddress)): Future[StreamTransport]
{.raises: [LPError, MaInvalidAddress].} =
{.async.} =
## Open new connection to remote peer with address ``ma`` and create
## new transport object ``StreamTransport`` for established connection.
## ``bufferSize`` is size of internal buffer for transport.
@@ -88,12 +89,12 @@ proc connect*(
let transportAddress = initTAddress(ma).tryGet()
compilesOr:
return connect(transportAddress, bufferSize, child,
return await connect(transportAddress, bufferSize, child,
if localAddress.isSome(): initTAddress(localAddress.expect("just checked")).tryGet() else: TransportAddress(),
flags)
do:
# support for older chronos versions
return connect(transportAddress, bufferSize, child)
return await connect(transportAddress, bufferSize, child)
proc createStreamServer*[T](ma: MultiAddress,
cbproc: StreamCallback,

46
tests/di/testdi.nim Normal file
View File

@@ -0,0 +1,46 @@
{.used.}
# Nim-Libp2p
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [].}
import ../helpers
import ../../di/di
type
MyInterface = ref object of RootObj
MyImplementation = ref object of MyInterface
AnotherImplementation = ref object of MyInterface
MyObject = object
method doSomething(obj: MyInterface) {.base.} = discard
method doSomething(obj: MyImplementation) =
echo "MyImplementation doing something!"
method doSomething(obj: AnotherImplementation) =
echo "AnotherImplementation doing something!"
proc provideMyImplementation(): MyInterface =
MyImplementation()
proc provideAnotherImplementation(): MyInterface =
AnotherImplementation()
suite "DI":
asyncTest "DI":
let container = Container()
register[MyInterface](container, provideMyImplementation, "myImplementation")
register[MyInterface](container, provideAnotherImplementation, "anotherImplementation")
let myImplementation = resolve[MyInterface](container, "anotherImplementation")
myImplementation.doSomething()

View File

@@ -569,8 +569,8 @@ suite "GossipSub":
proc slowValidator(topic: string, message: Message): Future[ValidationResult] {.async.} =
await cRelayed
# Empty A & C caches to detect duplicates
gossip1.seen = TimedCache[MessageId].init()
gossip3.seen = TimedCache[MessageId].init()
gossip1.seen = TimedCache[SaltedId].init()
gossip3.seen = TimedCache[SaltedId].init()
let msgId = toSeq(gossip2.validationSeen.keys)[0]
checkUntilTimeout(try: gossip2.validationSeen[msgId].len > 0 except: false)
result = ValidationResult.Accept

View File

@@ -1,12 +1,11 @@
{.used.}
import unittest2, options, sets, sequtils
import unittest2, sequtils
import stew/byteutils
import ../../libp2p/[peerid,
crypto/crypto,
protocols/pubsub/mcache,
protocols/pubsub/rpc/messages]
import ./utils
protocols/pubsub/rpc/message]
var rng = newRng()

View File

@@ -24,6 +24,8 @@ suite "TimedCache":
2 in cache
3 in cache
cache.addedAt(2) == now + 3.seconds
check:
cache.put(2, now + 7.seconds) # refreshes 2
not cache.put(4, now + 12.seconds) # expires 3
@@ -33,6 +35,23 @@ suite "TimedCache":
3 notin cache
4 in cache
check:
cache.del(4).isSome()
4 notin cache
check:
not cache.put(100, now + 100.seconds) # expires everything
100 in cache
2 notin cache
test "enough items to force cache heap storage growth":
var cache = TimedCache[int].init(5.seconds)
let now = Moment.now()
for i in 101..100000:
check:
not cache.put(i, now)
for i in 101..100000:
check:
i in cache

View File

@@ -315,7 +315,6 @@ suite "Circuit Relay V2":
await sleepAsync(chronos.timer.seconds(ttl + 1))
expect(DialFailedError):
check: conn.atEof()
await conn.close()
await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec)

View File

@@ -0,0 +1,86 @@
{.used.}
# Nim-Libp2p
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import std/[options, sequtils]
import stew/[byteutils]
import chronos, metrics
import unittest2
import ../libp2p/[builders, switch]
import ../libp2p/services/wildcardresolverservice
import ../libp2p/[multiaddress, multicodec]
import ./helpers
import ../di/di
type NetworkInterfaceProviderMock* = ref object of NetworkInterfaceProvider
method getAddresses(
networkInterfaceProvider: NetworkInterfaceProviderMock, addrFamily: AddressFamily
): seq[InterfaceAddress] {.gcsafe, raises: [].} =
echo "getAddressesMock"
try:
if addrFamily == AddressFamily.IPv4:
return
@[
InterfaceAddress.init(initTAddress("127.0.0.1:0"), 8),
InterfaceAddress.init(initTAddress("192.168.1.22:0"), 24),
]
else:
return
@[
InterfaceAddress.init(initTAddress("::1:0"), 8),
InterfaceAddress.init(initTAddress("fe80::1:0"), 64),
]
except TransportAddressError as e:
echo "Error: " & $e.msg
fail()
proc networkInterfaceProviderMock(): NetworkInterfaceProvider =
NetworkInterfaceProviderMock.new()
proc createSwitch(): Switch =
SwitchBuilder
.new()
.withRng(newRng())
.withAddresses(
@[
MultiAddress.init("/ip4/0.0.0.0/tcp/0/").tryGet(),
MultiAddress.init("/ip6/::/tcp/0/").tryGet(),
]
)
.withTcpTransport()
.withMplex()
.withNoise()
.withBinding(networkInterfaceProviderMock)
.build()
suite "WildcardAddressResolverService":
teardown:
checkTrackers()
asyncTest "WildcardAddressResolverService must resolve wildcard addresses and stop doing so when stopped":
let switch = createSwitch()
await switch.start()
let tcpIp4 = switch.peerInfo.addrs[0][multiCodec("tcp")].get # tcp port for ip4
let tcpIp6 = switch.peerInfo.addrs[2][multiCodec("tcp")].get # tcp port for ip6
check switch.peerInfo.addrs ==
@[
MultiAddress.init("/ip4/127.0.0.1" & $tcpIp4).get,
MultiAddress.init("/ip4/192.168.1.22" & $tcpIp4).get,
MultiAddress.init("/ip6/::1" & $tcpIp6).get,
MultiAddress.init("/ip6/fe80::1" & $tcpIp6).get,
]
await switch.stop()
check switch.peerInfo.addrs ==
@[
MultiAddress.init("/ip4/0.0.0.0" & $tcpIp4).get,
MultiAddress.init("/ip6/::" & $tcpIp6).get,
]

View File

@@ -377,3 +377,24 @@ suite "Yamux":
expect LPStreamClosedError: discard await streamA.readLp(100)
blocker.complete()
await streamA.close()
asyncTest "Peer must be able to read from stream after closing it for writing":
mSetup()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
check (await conn.readLp(100)) == fromHex("1234")
except CancelledError, LPStreamError:
return
try:
await conn.writeLp(fromHex("5678"))
except CancelledError, LPStreamError:
return
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await streamA.writeLp(fromHex("1234"))
await streamA.close()
check (await streamA.readLp(100)) == fromHex("5678")

View File

@@ -11,6 +11,6 @@ COPY . nim-libp2p/
RUN \
cd nim-libp2p && \
nim c --skipProjCfg --skipParentCfg --NimblePath:./nimbledeps/pkgs -p:nim-libp2p -d:chronicles_log_level=WARN --threads:off ./tests/transport-interop/main.nim
nim c --skipProjCfg --skipParentCfg --NimblePath:./nimbledeps/pkgs -p:nim-libp2p -d:chronicles_log_level=WARN -d:chronicles_default_output_device=stderr --threads:off ./tests/transport-interop/main.nim
ENTRYPOINT ["/app/nim-libp2p/tests/transport-interop/main"]