mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-09 14:28:11 -05:00
Compare commits
22 Commits
v1.7.0
...
mcachehitm
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
81e70551c6 | ||
|
|
bf80684b1a | ||
|
|
129e54a70b | ||
|
|
20ea5b5a40 | ||
|
|
08bb0104ed | ||
|
|
a751baeb8b | ||
|
|
436bc5cb0f | ||
|
|
df50d87a49 | ||
|
|
ed70a576a1 | ||
|
|
ffe5b6272a | ||
|
|
45ca569040 | ||
|
|
f0865fc394 | ||
|
|
b06c67dff9 | ||
|
|
79d3181667 | ||
|
|
8c86f99379 | ||
|
|
6cc1d1cda5 | ||
|
|
2f46751c68 | ||
|
|
9b4b68b9f9 | ||
|
|
15a107a368 | ||
|
|
0d6c60df05 | ||
|
|
013d773796 | ||
|
|
1df5fc18d7 |
@@ -64,6 +64,7 @@ type
|
||||
closeCode*: MessageType # cached in/out close code
|
||||
resetCode*: MessageType # cached in/out reset code
|
||||
writes*: int # In-flight writes
|
||||
writesBytes*: int # In-flight writes bytes
|
||||
|
||||
func shortLog*(s: LPChannel): auto =
|
||||
try:
|
||||
@@ -228,6 +229,7 @@ proc completeWrite(
|
||||
s: LPChannel, fut: Future[void], msgLen: int): Future[void] {.async.} =
|
||||
try:
|
||||
s.writes += 1
|
||||
s.writesBytes += msgLen
|
||||
|
||||
when defined(libp2p_mplex_metrics):
|
||||
libp2p_mplex_qlen.observe(s.writes.int64 - 1)
|
||||
@@ -257,6 +259,10 @@ proc completeWrite(
|
||||
raise newLPStreamConnDownError(exc)
|
||||
finally:
|
||||
s.writes -= 1
|
||||
s.writesBytes -= msgLen
|
||||
|
||||
|
||||
method queuedSendBytes*(channel: LPChannel): int = channel.writesBytes
|
||||
|
||||
method write*(s: LPChannel, msg: seq[byte]): Future[void] =
|
||||
## Write to mplex channel - there may be up to MaxWrite concurrent writes
|
||||
|
||||
@@ -178,7 +178,12 @@ method handle*(m: Mplex) {.async, gcsafe.} =
|
||||
|
||||
trace "pushing data to channel", m, channel, len = data.len
|
||||
try:
|
||||
let start = Moment.now()
|
||||
await channel.pushData(data)
|
||||
let delay = Moment.now() - start
|
||||
|
||||
if delay > 50.milliseconds and m.connection.shortAgent == "lodestar":
|
||||
debug "pushData was slow!", delay, protocol=channel.protocol, peer = $m.connection.peerId
|
||||
trace "pushed data to channel", m, channel, len = data.len
|
||||
except LPStreamClosedError as exc:
|
||||
# Channel is being closed, but `cleanupChann` was not yet triggered.
|
||||
|
||||
@@ -176,6 +176,8 @@ proc sendQueueBytes(channel: YamuxChannel, limit: bool = false): int =
|
||||
for (elem, sent, _) in channel.sendQueue:
|
||||
result.inc(min(elem.len - sent, if limit: channel.maxRecvWindow div 3 else: elem.len - sent))
|
||||
|
||||
method queuedSendBytes*(channel: YamuxChannel): int = channel.sendQueueBytes()
|
||||
|
||||
proc actuallyClose(channel: YamuxChannel) {.async.} =
|
||||
if channel.closedLocally and channel.sendQueue.len == 0 and
|
||||
channel.closedRemotely.done():
|
||||
|
||||
@@ -158,8 +158,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) =
|
||||
peer.appScore = stats.appScore
|
||||
peer.behaviourPenalty = stats.behaviourPenalty
|
||||
|
||||
peer.iWantBudget = IWantPeerBudget
|
||||
peer.iHaveBudget = IHavePeerBudget
|
||||
peer.iHaveBudget = IHavePeerBudget
|
||||
|
||||
method onPubSubPeerEvent*(p: GossipSub, peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} =
|
||||
case event.kind
|
||||
|
||||
@@ -12,7 +12,7 @@ when (NimMajor, NimMinor) < (1, 4):
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[tables, sequtils, sets, algorithm]
|
||||
import std/[tables, sequtils, sets, algorithm, deques]
|
||||
import chronos, chronicles, metrics
|
||||
import "."/[types, scoring]
|
||||
import ".."/[pubsubpeer, peertable, timedcache, mcache, floodsub, pubsub]
|
||||
@@ -31,7 +31,7 @@ declareGauge(libp2p_gossipsub_no_peers_topics, "number of topics in mesh with no
|
||||
declareGauge(libp2p_gossipsub_low_peers_topics, "number of topics in mesh with at least one but below dlow peers")
|
||||
declareGauge(libp2p_gossipsub_healthy_peers_topics, "number of topics in mesh with at least dlow peers (but below dhigh)")
|
||||
declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"])
|
||||
declareSummary(libp2p_gossipsub_mcache_hit, "ratio of successful IWANT message cache lookups")
|
||||
declareGauge(libp2p_gossipsub_received_iwants, "received iwants", labels = ["kind"])
|
||||
|
||||
proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} =
|
||||
g.withPeerStats(p.peerId) do (stats: var PeerStats):
|
||||
@@ -272,28 +272,32 @@ proc handleIHave*(g: GossipSub,
|
||||
proc handleIWant*(g: GossipSub,
|
||||
peer: PubSubPeer,
|
||||
iwants: seq[ControlIWant]): seq[Message] {.raises: [Defect].} =
|
||||
var messages: seq[Message]
|
||||
if peer.shortAgent == "lodestar":
|
||||
info "Got IWANT", peerType=peer.shortAgent, peerId=peer.peerId, msgs=iwants.mapIt(it.messageIds.len)
|
||||
var
|
||||
messages: seq[Message]
|
||||
invalidRequests = 0
|
||||
if peer.score < g.parameters.gossipThreshold:
|
||||
trace "iwant: ignoring low score peer", peer, score = peer.score
|
||||
elif peer.iWantBudget <= 0:
|
||||
trace "iwant: ignoring out of budget peer", peer, score = peer.score
|
||||
else:
|
||||
let deIwants = iwants.deduplicate()
|
||||
for iwant in deIwants:
|
||||
let deIwantsMsgs = iwant.messageIds.deduplicate()
|
||||
for mid in deIwantsMsgs:
|
||||
for iwant in iwants:
|
||||
for mid in iwant.messageIds:
|
||||
trace "peer sent iwant", peer, messageID = mid
|
||||
# canAskIWant will only return true once for a specific message
|
||||
if not peer.canAskIWant(mid):
|
||||
libp2p_gossipsub_received_iwants.inc(1, labelValues=["notsent"])
|
||||
|
||||
invalidRequests.inc()
|
||||
if invalidRequests > 20:
|
||||
libp2p_gossipsub_received_iwants.inc(1, labelValues=["skipped"])
|
||||
return messages
|
||||
continue
|
||||
let msg = g.mcache.get(mid)
|
||||
if msg.isSome:
|
||||
libp2p_gossipsub_mcache_hit.observe(1)
|
||||
# avoid spam
|
||||
if peer.iWantBudget > 0:
|
||||
messages.add(msg.get())
|
||||
dec peer.iWantBudget
|
||||
else:
|
||||
break
|
||||
libp2p_gossipsub_received_iwants.inc(1, labelValues=["correct"])
|
||||
messages.add(msg.get())
|
||||
else:
|
||||
libp2p_gossipsub_mcache_hit.observe(0)
|
||||
libp2p_gossipsub_received_iwants.inc(1, labelValues=["unknown"])
|
||||
return messages
|
||||
|
||||
proc commitMetrics(metrics: var MeshMetrics) {.raises: [Defect].} =
|
||||
@@ -586,8 +590,6 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
|
||||
|
||||
cacheWindowSize += midsSeq.len
|
||||
|
||||
trace "got messages to emit", size=midsSeq.len
|
||||
|
||||
# not in spec
|
||||
# similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101
|
||||
# and go https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L582
|
||||
@@ -616,8 +618,13 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
|
||||
g.rng.shuffle(allPeers)
|
||||
allPeers.setLen(target)
|
||||
|
||||
#info "got messages to emit", size=midsSeq.len, topic, msgs=midsSeq.mapIt(it.toHex()), peers=allPeers.len, peerValues=allPeers.mapIt((it.shortAgent, it.queuedSendBytes, $it.peerId))
|
||||
|
||||
let msgIdsAsSet = ihave.messageIds.toHashSet()
|
||||
|
||||
for peer in allPeers:
|
||||
control.mgetOrPut(peer, ControlMessage()).ihave.add(ihave)
|
||||
peer.sentIHaves[^1].incl(msgIdsAsSet)
|
||||
|
||||
libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64)
|
||||
|
||||
@@ -628,7 +635,9 @@ proc onHeartbeat(g: GossipSub) {.raises: [Defect].} =
|
||||
# reset IHAVE cap
|
||||
block:
|
||||
for peer in g.peers.values:
|
||||
peer.iWantBudget = IWantPeerBudget
|
||||
peer.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||
if peer.sentIHaves.len > g.parameters.historyLength:
|
||||
discard peer.sentIHaves.popLast()
|
||||
peer.iHaveBudget = IHavePeerBudget
|
||||
|
||||
var meshMetrics = MeshMetrics()
|
||||
|
||||
@@ -48,7 +48,6 @@ const
|
||||
|
||||
const
|
||||
BackoffSlackTime* = 2 # seconds
|
||||
IWantPeerBudget* = 25 # 25 messages per second ( reset every heartbeat )
|
||||
IHavePeerBudget* = 10
|
||||
# the max amount of IHave to expose, not by spec, but go as example
|
||||
# rust sigp: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/config.rs#L572
|
||||
|
||||
@@ -12,7 +12,7 @@ when (NimMajor, NimMinor) < (1, 4):
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[sequtils, strutils, tables, hashes, options]
|
||||
import std/[sequtils, strutils, tables, hashes, options, sets, deques]
|
||||
import stew/results
|
||||
import chronos, chronicles, nimcrypto/sha2, metrics
|
||||
import rpc/[messages, message, protobuf],
|
||||
@@ -62,18 +62,28 @@ type
|
||||
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
|
||||
|
||||
score*: float64
|
||||
iWantBudget*: int
|
||||
sentIHaves*: Deque[HashSet[MessageId]]
|
||||
iHaveBudget*: int
|
||||
maxMessageSize: int
|
||||
appScore*: float64 # application specific score
|
||||
behaviourPenalty*: float64 # the eventual penalty score
|
||||
|
||||
when defined(libp2p_agents_metrics):
|
||||
shortAgent*: string
|
||||
|
||||
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void]
|
||||
{.gcsafe, raises: [Defect].}
|
||||
|
||||
when defined(libp2p_agents_metrics):
|
||||
func shortAgent*(p: PubSubPeer): string =
|
||||
if p.sendConn.isNil:
|
||||
"unknown"
|
||||
else:
|
||||
#TODO the sendConn is setup before identify,
|
||||
#so we have to read the parents short agent..
|
||||
p.sendConn.getWrapped().shortAgent
|
||||
|
||||
proc queuedSendBytes*(p: PubSubPeer): int =
|
||||
if p.sendConn.isNil: -2
|
||||
else: p.sendConn.queuedSendBytes()
|
||||
|
||||
func hash*(p: PubSubPeer): Hash =
|
||||
p.peerId.hash
|
||||
|
||||
@@ -150,7 +160,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||
# metrics
|
||||
libp2p_pubsub_received_messages.inc(labelValues = [$p.peerId, t])
|
||||
|
||||
await p.handler(p, rmsg.get())
|
||||
discard p.handler(p, rmsg.get())
|
||||
finally:
|
||||
await conn.close()
|
||||
except CancelledError:
|
||||
@@ -286,6 +296,13 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} =
|
||||
|
||||
asyncSpawn p.sendEncoded(encoded)
|
||||
|
||||
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
|
||||
for sentIHave in p.sentIHaves.mitems():
|
||||
if msgId in sentIHave:
|
||||
sentIHave.excl(msgId)
|
||||
return true
|
||||
return false
|
||||
|
||||
proc new*(
|
||||
T: typedesc[PubSubPeer],
|
||||
peerId: PeerId,
|
||||
@@ -294,7 +311,7 @@ proc new*(
|
||||
codec: string,
|
||||
maxMessageSize: int): T =
|
||||
|
||||
T(
|
||||
result = T(
|
||||
getConn: getConn,
|
||||
onEvent: onEvent,
|
||||
codec: codec,
|
||||
@@ -302,3 +319,4 @@ proc new*(
|
||||
connectedFut: newFuture[void](),
|
||||
maxMessageSize: maxMessageSize
|
||||
)
|
||||
result.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||
|
||||
@@ -14,7 +14,7 @@ else:
|
||||
|
||||
import std/[tables]
|
||||
|
||||
import chronos/timer
|
||||
import chronos/timer, stew/results
|
||||
|
||||
const Timeout* = 10.seconds # default timeout in ms
|
||||
|
||||
@@ -22,6 +22,7 @@ type
|
||||
TimedEntry*[K] = ref object of RootObj
|
||||
key: K
|
||||
addedAt: Moment
|
||||
expiresAt: Moment
|
||||
next, prev: TimedEntry[K]
|
||||
|
||||
TimedCache*[K] = object of RootObj
|
||||
@@ -30,14 +31,13 @@ type
|
||||
timeout: Duration
|
||||
|
||||
func expire*(t: var TimedCache, now: Moment = Moment.now()) =
|
||||
let expirationLimit = now - t.timeout
|
||||
while t.head != nil and t.head.addedAt < expirationLimit:
|
||||
while t.head != nil and t.head.expiresAt < now:
|
||||
t.entries.del(t.head.key)
|
||||
t.head.prev = nil
|
||||
t.head = t.head.next
|
||||
if t.head == nil: t.tail = nil
|
||||
|
||||
func del*[K](t: var TimedCache[K], key: K): bool =
|
||||
func del*[K](t: var TimedCache[K], key: K): Opt[TimedEntry[K]] =
|
||||
# Removes existing key from cache, returning false if it was not present
|
||||
var item: TimedEntry[K]
|
||||
if t.entries.pop(key, item):
|
||||
@@ -46,9 +46,9 @@ func del*[K](t: var TimedCache[K], key: K): bool =
|
||||
|
||||
if item.next != nil: item.next.prev = item.prev
|
||||
if item.prev != nil: item.prev.next = item.next
|
||||
true
|
||||
Opt.some(item)
|
||||
else:
|
||||
false
|
||||
Opt.none(TimedEntry[K])
|
||||
|
||||
func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
|
||||
# Puts k in cache, returning true if the item was already present and false
|
||||
@@ -56,9 +56,13 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
|
||||
# refreshed.
|
||||
t.expire(now)
|
||||
|
||||
var res = t.del(k) # Refresh existing item
|
||||
var previous = t.del(k) # Refresh existing item
|
||||
|
||||
let node = TimedEntry[K](key: k, addedAt: now)
|
||||
let addedAt =
|
||||
if previous.isSome: previous.get().addedAt
|
||||
else: now
|
||||
|
||||
let node = TimedEntry[K](key: k, addedAt: addedAt, expiresAt: now + t.timeout)
|
||||
|
||||
if t.head == nil:
|
||||
t.tail = node
|
||||
@@ -66,7 +70,7 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
|
||||
else:
|
||||
# search from tail because typically that's where we add when now grows
|
||||
var cur = t.tail
|
||||
while cur != nil and node.addedAt < cur.addedAt:
|
||||
while cur != nil and node.expiresAt < cur.expiresAt:
|
||||
cur = cur.prev
|
||||
|
||||
if cur == nil:
|
||||
@@ -82,7 +86,7 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
|
||||
|
||||
t.entries[k] = node
|
||||
|
||||
res
|
||||
previous.isSome()
|
||||
|
||||
func contains*[K](t: TimedCache[K], k: K): bool =
|
||||
k in t.entries
|
||||
|
||||
@@ -173,6 +173,8 @@ method closed*(s: LPStream): bool {.base, public.} =
|
||||
method atEof*(s: LPStream): bool {.base, public.} =
|
||||
s.isEof
|
||||
|
||||
method queuedSendBytes*(s: LPStream): int {.base.} = -1
|
||||
|
||||
method readOnce*(
|
||||
s: LPStream,
|
||||
pbytes: pointer,
|
||||
|
||||
@@ -63,5 +63,5 @@ when defined(libp2p_agents_metrics):
|
||||
err("toLowerAscii failed")
|
||||
|
||||
const
|
||||
KnownLibP2PAgents* {.strdefine.} = ""
|
||||
KnownLibP2PAgents* {.strdefine.} = "nim-libp2p"
|
||||
KnownLibP2PAgentsSeq* = KnownLibP2PAgents.safeToLowerAscii().tryGet().split(",")
|
||||
|
||||
@@ -19,7 +19,9 @@ import utils,
|
||||
protocols/pubsub/pubsub,
|
||||
protocols/pubsub/floodsub,
|
||||
protocols/pubsub/rpc/messages,
|
||||
protocols/pubsub/peertable]
|
||||
protocols/pubsub/peertable,
|
||||
protocols/pubsub/pubsubpeer
|
||||
]
|
||||
import ../../libp2p/protocols/pubsub/errors as pubsub_errors
|
||||
|
||||
import ../helpers
|
||||
@@ -62,6 +64,14 @@ suite "FloodSub":
|
||||
check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0
|
||||
check (await completionFut.wait(5.seconds)) == true
|
||||
|
||||
when defined(libp2p_agents_metrics):
|
||||
let
|
||||
agentA = nodes[0].peers[nodes[1].switch.peerInfo.peerId].shortAgent
|
||||
agentB = nodes[1].peers[nodes[0].switch.peerInfo.peerId].shortAgent
|
||||
check:
|
||||
agentA == "nim-libp2p"
|
||||
agentB == "nim-libp2p"
|
||||
|
||||
await allFuturesThrowing(
|
||||
nodes[0].switch.stop(),
|
||||
nodes[1].switch.stop()
|
||||
|
||||
@@ -2,7 +2,7 @@ include ../../libp2p/protocols/pubsub/gossipsub
|
||||
|
||||
{.used.}
|
||||
|
||||
import options
|
||||
import std/[options, deques]
|
||||
import stew/byteutils
|
||||
import ../../libp2p/builders
|
||||
import ../../libp2p/errors
|
||||
@@ -713,6 +713,7 @@ suite "GossipSub internal":
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
let id = @[0'u8, 1, 2, 3]
|
||||
gossipSub.mcache.put(id, Message())
|
||||
peer.sentIHaves[^1].incl(id)
|
||||
let msg = ControlIWant(
|
||||
messageIDs: @[id, id, id]
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user