Compare commits

...

22 Commits

Author SHA1 Message Date
Tanguy
81e70551c6 Merge remote-tracking branch 'origin/betteriwanthandling' into HEAD 2023-03-20 11:48:00 +01:00
Tanguy
bf80684b1a add ddos protection 2023-03-20 11:42:39 +01:00
Tanguy
129e54a70b switch to deque 2023-03-14 15:41:17 +01:00
Tanguy
20ea5b5a40 Better IWANT handling 2023-03-14 15:33:48 +01:00
Tanguy
08bb0104ed revert 2023-03-07 11:03:06 +01:00
Tanguy
a751baeb8b dumb test 2023-03-06 17:57:24 +01:00
Tanguy
436bc5cb0f less logs 2023-03-06 15:18:58 +01:00
Tanguy
df50d87a49 more logs 2023-03-06 15:18:20 +01:00
Tanguy
ed70a576a1 Revert "ugly test"
This reverts commit ffe5b6272a.
2023-03-06 12:24:46 +01:00
Tanguy
ffe5b6272a ugly test 2023-03-03 15:33:42 +01:00
Tanguy
45ca569040 Merge remote-tracking branch 'origin/unstable' into HEAD 2023-03-03 15:00:48 +01:00
Tanguy
f0865fc394 avoid slow push 2023-03-01 15:10:41 +01:00
Tanguy
b06c67dff9 extra log 2023-02-28 18:13:39 +01:00
Tanguy
79d3181667 fix yamux 2023-02-22 11:16:19 +01:00
Tanguy
8c86f99379 add slow pushdata log 2023-02-22 11:12:30 +01:00
Tanguy
6cc1d1cda5 fix 2023-02-20 13:53:34 +01:00
Tanguy
2f46751c68 Add queued send bytes 2023-02-20 13:41:53 +01:00
Tanguy
9b4b68b9f9 even more logs 2023-02-09 18:20:56 +01:00
Tanguy
15a107a368 Fix & test shortAgent 2023-02-09 18:13:49 +01:00
Tanguy
0d6c60df05 GossipSub: shortAgent & TimedEntry fixes 2023-02-07 13:37:38 +01:00
Tanguy
013d773796 add log 2023-02-07 12:10:19 +01:00
Tanguy
1df5fc18d7 try other mcache metric 2023-02-07 11:47:14 +01:00
12 changed files with 98 additions and 43 deletions

View File

@@ -64,6 +64,7 @@ type
closeCode*: MessageType # cached in/out close code
resetCode*: MessageType # cached in/out reset code
writes*: int # In-flight writes
writesBytes*: int # In-flight writes bytes
func shortLog*(s: LPChannel): auto =
try:
@@ -228,6 +229,7 @@ proc completeWrite(
s: LPChannel, fut: Future[void], msgLen: int): Future[void] {.async.} =
try:
s.writes += 1
s.writesBytes += msgLen
when defined(libp2p_mplex_metrics):
libp2p_mplex_qlen.observe(s.writes.int64 - 1)
@@ -257,6 +259,10 @@ proc completeWrite(
raise newLPStreamConnDownError(exc)
finally:
s.writes -= 1
s.writesBytes -= msgLen
method queuedSendBytes*(channel: LPChannel): int = channel.writesBytes
method write*(s: LPChannel, msg: seq[byte]): Future[void] =
## Write to mplex channel - there may be up to MaxWrite concurrent writes

View File

@@ -178,7 +178,12 @@ method handle*(m: Mplex) {.async, gcsafe.} =
trace "pushing data to channel", m, channel, len = data.len
try:
let start = Moment.now()
await channel.pushData(data)
let delay = Moment.now() - start
if delay > 50.milliseconds and m.connection.shortAgent == "lodestar":
debug "pushData was slow!", delay, protocol=channel.protocol, peer = $m.connection.peerId
trace "pushed data to channel", m, channel, len = data.len
except LPStreamClosedError as exc:
# Channel is being closed, but `cleanupChann` was not yet triggered.

View File

@@ -176,6 +176,8 @@ proc sendQueueBytes(channel: YamuxChannel, limit: bool = false): int =
for (elem, sent, _) in channel.sendQueue:
result.inc(min(elem.len - sent, if limit: channel.maxRecvWindow div 3 else: elem.len - sent))
method queuedSendBytes*(channel: YamuxChannel): int = channel.sendQueueBytes()
proc actuallyClose(channel: YamuxChannel) {.async.} =
if channel.closedLocally and channel.sendQueue.len == 0 and
channel.closedRemotely.done():

View File

@@ -158,8 +158,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) =
peer.appScore = stats.appScore
peer.behaviourPenalty = stats.behaviourPenalty
peer.iWantBudget = IWantPeerBudget
peer.iHaveBudget = IHavePeerBudget
peer.iHaveBudget = IHavePeerBudget
method onPubSubPeerEvent*(p: GossipSub, peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} =
case event.kind

View File

@@ -12,7 +12,7 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import std/[tables, sequtils, sets, algorithm]
import std/[tables, sequtils, sets, algorithm, deques]
import chronos, chronicles, metrics
import "."/[types, scoring]
import ".."/[pubsubpeer, peertable, timedcache, mcache, floodsub, pubsub]
@@ -31,7 +31,7 @@ declareGauge(libp2p_gossipsub_no_peers_topics, "number of topics in mesh with no
declareGauge(libp2p_gossipsub_low_peers_topics, "number of topics in mesh with at least one but below dlow peers")
declareGauge(libp2p_gossipsub_healthy_peers_topics, "number of topics in mesh with at least dlow peers (but below dhigh)")
declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"])
declareSummary(libp2p_gossipsub_mcache_hit, "ratio of successful IWANT message cache lookups")
declareGauge(libp2p_gossipsub_received_iwants, "received iwants", labels = ["kind"])
proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} =
g.withPeerStats(p.peerId) do (stats: var PeerStats):
@@ -272,28 +272,32 @@ proc handleIHave*(g: GossipSub,
proc handleIWant*(g: GossipSub,
peer: PubSubPeer,
iwants: seq[ControlIWant]): seq[Message] {.raises: [Defect].} =
var messages: seq[Message]
if peer.shortAgent == "lodestar":
info "Got IWANT", peerType=peer.shortAgent, peerId=peer.peerId, msgs=iwants.mapIt(it.messageIds.len)
var
messages: seq[Message]
invalidRequests = 0
if peer.score < g.parameters.gossipThreshold:
trace "iwant: ignoring low score peer", peer, score = peer.score
elif peer.iWantBudget <= 0:
trace "iwant: ignoring out of budget peer", peer, score = peer.score
else:
let deIwants = iwants.deduplicate()
for iwant in deIwants:
let deIwantsMsgs = iwant.messageIds.deduplicate()
for mid in deIwantsMsgs:
for iwant in iwants:
for mid in iwant.messageIds:
trace "peer sent iwant", peer, messageID = mid
# canAskIWant will only return true once for a specific message
if not peer.canAskIWant(mid):
libp2p_gossipsub_received_iwants.inc(1, labelValues=["notsent"])
invalidRequests.inc()
if invalidRequests > 20:
libp2p_gossipsub_received_iwants.inc(1, labelValues=["skipped"])
return messages
continue
let msg = g.mcache.get(mid)
if msg.isSome:
libp2p_gossipsub_mcache_hit.observe(1)
# avoid spam
if peer.iWantBudget > 0:
messages.add(msg.get())
dec peer.iWantBudget
else:
break
libp2p_gossipsub_received_iwants.inc(1, labelValues=["correct"])
messages.add(msg.get())
else:
libp2p_gossipsub_mcache_hit.observe(0)
libp2p_gossipsub_received_iwants.inc(1, labelValues=["unknown"])
return messages
proc commitMetrics(metrics: var MeshMetrics) {.raises: [Defect].} =
@@ -586,8 +590,6 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
cacheWindowSize += midsSeq.len
trace "got messages to emit", size=midsSeq.len
# not in spec
# similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101
# and go https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L582
@@ -616,8 +618,13 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
g.rng.shuffle(allPeers)
allPeers.setLen(target)
#info "got messages to emit", size=midsSeq.len, topic, msgs=midsSeq.mapIt(it.toHex()), peers=allPeers.len, peerValues=allPeers.mapIt((it.shortAgent, it.queuedSendBytes, $it.peerId))
let msgIdsAsSet = ihave.messageIds.toHashSet()
for peer in allPeers:
control.mgetOrPut(peer, ControlMessage()).ihave.add(ihave)
peer.sentIHaves[^1].incl(msgIdsAsSet)
libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64)
@@ -628,7 +635,9 @@ proc onHeartbeat(g: GossipSub) {.raises: [Defect].} =
# reset IHAVE cap
block:
for peer in g.peers.values:
peer.iWantBudget = IWantPeerBudget
peer.sentIHaves.addFirst(default(HashSet[MessageId]))
if peer.sentIHaves.len > g.parameters.historyLength:
discard peer.sentIHaves.popLast()
peer.iHaveBudget = IHavePeerBudget
var meshMetrics = MeshMetrics()

View File

@@ -48,7 +48,6 @@ const
const
BackoffSlackTime* = 2 # seconds
IWantPeerBudget* = 25 # 25 messages per second ( reset every heartbeat )
IHavePeerBudget* = 10
# the max amount of IHave to expose, not by spec, but go as example
# rust sigp: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/config.rs#L572

View File

@@ -12,7 +12,7 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import std/[sequtils, strutils, tables, hashes, options]
import std/[sequtils, strutils, tables, hashes, options, sets, deques]
import stew/results
import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf],
@@ -62,18 +62,28 @@ type
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
score*: float64
iWantBudget*: int
sentIHaves*: Deque[HashSet[MessageId]]
iHaveBudget*: int
maxMessageSize: int
appScore*: float64 # application specific score
behaviourPenalty*: float64 # the eventual penalty score
when defined(libp2p_agents_metrics):
shortAgent*: string
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void]
{.gcsafe, raises: [Defect].}
when defined(libp2p_agents_metrics):
func shortAgent*(p: PubSubPeer): string =
if p.sendConn.isNil:
"unknown"
else:
#TODO the sendConn is setup before identify,
#so we have to read the parents short agent..
p.sendConn.getWrapped().shortAgent
proc queuedSendBytes*(p: PubSubPeer): int =
if p.sendConn.isNil: -2
else: p.sendConn.queuedSendBytes()
func hash*(p: PubSubPeer): Hash =
p.peerId.hash
@@ -150,7 +160,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
# metrics
libp2p_pubsub_received_messages.inc(labelValues = [$p.peerId, t])
await p.handler(p, rmsg.get())
discard p.handler(p, rmsg.get())
finally:
await conn.close()
except CancelledError:
@@ -286,6 +296,13 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} =
asyncSpawn p.sendEncoded(encoded)
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
for sentIHave in p.sentIHaves.mitems():
if msgId in sentIHave:
sentIHave.excl(msgId)
return true
return false
proc new*(
T: typedesc[PubSubPeer],
peerId: PeerId,
@@ -294,7 +311,7 @@ proc new*(
codec: string,
maxMessageSize: int): T =
T(
result = T(
getConn: getConn,
onEvent: onEvent,
codec: codec,
@@ -302,3 +319,4 @@ proc new*(
connectedFut: newFuture[void](),
maxMessageSize: maxMessageSize
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))

View File

@@ -14,7 +14,7 @@ else:
import std/[tables]
import chronos/timer
import chronos/timer, stew/results
const Timeout* = 10.seconds # default timeout in ms
@@ -22,6 +22,7 @@ type
TimedEntry*[K] = ref object of RootObj
key: K
addedAt: Moment
expiresAt: Moment
next, prev: TimedEntry[K]
TimedCache*[K] = object of RootObj
@@ -30,14 +31,13 @@ type
timeout: Duration
func expire*(t: var TimedCache, now: Moment = Moment.now()) =
let expirationLimit = now - t.timeout
while t.head != nil and t.head.addedAt < expirationLimit:
while t.head != nil and t.head.expiresAt < now:
t.entries.del(t.head.key)
t.head.prev = nil
t.head = t.head.next
if t.head == nil: t.tail = nil
func del*[K](t: var TimedCache[K], key: K): bool =
func del*[K](t: var TimedCache[K], key: K): Opt[TimedEntry[K]] =
# Removes existing key from cache, returning false if it was not present
var item: TimedEntry[K]
if t.entries.pop(key, item):
@@ -46,9 +46,9 @@ func del*[K](t: var TimedCache[K], key: K): bool =
if item.next != nil: item.next.prev = item.prev
if item.prev != nil: item.prev.next = item.next
true
Opt.some(item)
else:
false
Opt.none(TimedEntry[K])
func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
# Puts k in cache, returning true if the item was already present and false
@@ -56,9 +56,13 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
# refreshed.
t.expire(now)
var res = t.del(k) # Refresh existing item
var previous = t.del(k) # Refresh existing item
let node = TimedEntry[K](key: k, addedAt: now)
let addedAt =
if previous.isSome: previous.get().addedAt
else: now
let node = TimedEntry[K](key: k, addedAt: addedAt, expiresAt: now + t.timeout)
if t.head == nil:
t.tail = node
@@ -66,7 +70,7 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
else:
# search from tail because typically that's where we add when now grows
var cur = t.tail
while cur != nil and node.addedAt < cur.addedAt:
while cur != nil and node.expiresAt < cur.expiresAt:
cur = cur.prev
if cur == nil:
@@ -82,7 +86,7 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
t.entries[k] = node
res
previous.isSome()
func contains*[K](t: TimedCache[K], k: K): bool =
k in t.entries

View File

@@ -173,6 +173,8 @@ method closed*(s: LPStream): bool {.base, public.} =
method atEof*(s: LPStream): bool {.base, public.} =
s.isEof
method queuedSendBytes*(s: LPStream): int {.base.} = -1
method readOnce*(
s: LPStream,
pbytes: pointer,

View File

@@ -63,5 +63,5 @@ when defined(libp2p_agents_metrics):
err("toLowerAscii failed")
const
KnownLibP2PAgents* {.strdefine.} = ""
KnownLibP2PAgents* {.strdefine.} = "nim-libp2p"
KnownLibP2PAgentsSeq* = KnownLibP2PAgents.safeToLowerAscii().tryGet().split(",")

View File

@@ -19,7 +19,9 @@ import utils,
protocols/pubsub/pubsub,
protocols/pubsub/floodsub,
protocols/pubsub/rpc/messages,
protocols/pubsub/peertable]
protocols/pubsub/peertable,
protocols/pubsub/pubsubpeer
]
import ../../libp2p/protocols/pubsub/errors as pubsub_errors
import ../helpers
@@ -62,6 +64,14 @@ suite "FloodSub":
check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0
check (await completionFut.wait(5.seconds)) == true
when defined(libp2p_agents_metrics):
let
agentA = nodes[0].peers[nodes[1].switch.peerInfo.peerId].shortAgent
agentB = nodes[1].peers[nodes[0].switch.peerInfo.peerId].shortAgent
check:
agentA == "nim-libp2p"
agentB == "nim-libp2p"
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()

View File

@@ -2,7 +2,7 @@ include ../../libp2p/protocols/pubsub/gossipsub
{.used.}
import options
import std/[options, deques]
import stew/byteutils
import ../../libp2p/builders
import ../../libp2p/errors
@@ -713,6 +713,7 @@ suite "GossipSub internal":
let peer = gossipSub.getPubSubPeer(peerId)
let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message())
peer.sentIHaves[^1].incl(id)
let msg = ControlIWant(
messageIDs: @[id, id, id]
)