Compare commits

...

8 Commits

Author SHA1 Message Date
kaiserd
a16293bdca Merge branch 'master' into penalty-non-priority-queue 2024-05-29 15:48:08 +02:00
diegomrsantos
47990be775 Merge branch 'master' into penalty-non-priority-queue 2024-05-17 14:49:12 +02:00
Diego
8014a848a0 add constant 2024-04-29 20:19:19 +02:00
diegomrsantos
87110e551a Merge branch 'unstable' into penalty-non-priority-queue 2024-04-21 20:11:44 +02:00
Diego
917dbf83cd set metric to zero when cleaning non prio queue 2024-04-10 12:39:11 +02:00
Diego
0c61f12b85 apply penalty even if stream is closed 2024-04-10 08:35:54 +02:00
Diego
a6237bd1c1 clear non priority queue when score is negative 2024-04-10 00:40:29 +02:00
Diego
d9a60f339e apply behaviour penalty 2024-04-10 00:40:28 +02:00
2 changed files with 21 additions and 22 deletions

View File

@@ -660,6 +660,7 @@ proc onHeartbeat(g: GossipSub) =
g.pruned(peer, t)
g.mesh.removePeer(t, peer)
prunes &= peer
peer.clearNonPriorityQueue()
if prunes.len > 0:
let prune = RPCMsg(control: some(ControlMessage(
prune: @[ControlPrune(

View File

@@ -35,10 +35,10 @@ when defined(pubsubpeer_queue_metrics):
declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"])
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"])
declareCounter(libp2p_pubsub_disconnects_over_non_priority_queue_limit, "number of peers disconnected due to over non-prio queue capacity")
const
DefaultMaxNumElementsInNonPriorityQueue* = 1024
BehaviourPenaltyFoNonPriorityQueueOverLimit = 0.0001 # this value is quite arbitrary and was found empirically
# to result in a behaviourPenalty around [0.1, 0.2] when the score is updated.
type
PeerRateLimitError* = object of CatchableError
@@ -93,7 +93,6 @@ type
rpcmessagequeue: RpcMessageQueue
maxNumElementsInNonPriorityQueue*: int # The max number of elements allowed in the non-priority queue.
disconnected: bool
RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
{.gcsafe, raises: [].}
@@ -194,7 +193,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
proc closeSendConn(p: PubSubPeer, event: PubSubPeerEventKind) {.async.} =
if p.sendConn != nil:
trace "Removing send connection", p, conn = p.sendConn
debug "Removing send connection", p, conn = p.sendConn
await p.sendConn.close()
p.sendConn = nil
@@ -222,7 +221,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
# remote peer - if we had multiple channels up and one goes down, all
# stop working so we make an effort to only keep a single channel alive
trace "Get new send connection", p, newConn
debug "Get new send connection", p, newConn
# Careful to race conditions here.
# Topic subscription relies on either connectedFut
@@ -238,19 +237,15 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
finally:
await p.closeSendConn(PubSubPeerEventKind.StreamClosed)
proc connectImpl(p: PubSubPeer) {.async.} =
proc connectImpl(peer: PubSubPeer) {.async.} =
try:
# Keep trying to establish a connection while it's possible to do so - the
# send connection might get disconnected due to a timeout or an unrelated
# issue so we try to get a new on
while true:
if p.disconnected:
if not p.connectedFut.finished:
p.connectedFut.complete()
return
await connectOnce(p)
await connectOnce(peer)
except CatchableError as exc: # never cancelled
debug "Could not establish send connection", msg = exc.msg
debug "Could not establish send connection", peer, msg = exc.msg
proc connect*(p: PubSubPeer) =
if p.connected:
@@ -284,6 +279,13 @@ proc clearSendPriorityQueue(p: PubSubPeer) =
value = p.rpcmessagequeue.sendPriorityQueue.len.int64,
labelValues = [$p.peerId])
proc clearNonPriorityQueue*(p: PubSubPeer) =
if len(p.rpcmessagequeue.nonPriorityQueue) > 0:
p.rpcmessagequeue.nonPriorityQueue.clear()
when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
proc sendMsgContinue(conn: Connection, msgFut: Future[void]) {.async.} =
# Continuation for a pending `sendMsg` future from below
try:
@@ -348,7 +350,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v
p.rpcmessagequeue.nonPriorityQueue.len()) == 0
if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
debug "empty message, skipping", peer = p, msg = shortLog(msg)
Future[void].completed()
elif msg.len > p.maxMessageSize:
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
@@ -362,12 +364,10 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v
f
else:
if len(p.rpcmessagequeue.nonPriorityQueue) >= p.maxNumElementsInNonPriorityQueue:
if not p.disconnected:
p.disconnected = true
libp2p_pubsub_disconnects_over_non_priority_queue_limit.inc()
p.closeSendConn(PubSubPeerEventKind.DisconnectionRequested)
else:
Future[void].completed()
p.behaviourPenalty += BehaviourPenaltyFoNonPriorityQueueOverLimit
trace "Peer has reached maxNumElementsInNonPriorityQueue. Discarding message and applying behaviour penalty.", peer = p, score = p.score,
behaviourPenalty = p.behaviourPenalty, agent = p.getAgent()
Future[void].completed()
else:
let f = p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
when defined(pubsubpeer_queue_metrics):
@@ -480,11 +480,9 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) =
p.rpcmessagequeue.sendNonPriorityTask.cancelSoon()
p.rpcmessagequeue.sendNonPriorityTask = nil
p.rpcmessagequeue.sendPriorityQueue.clear()
p.rpcmessagequeue.nonPriorityQueue.clear()
when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
p.clearNonPriorityQueue()
proc new(T: typedesc[RpcMessageQueue]): T =
return T(