mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 17:18:27 -05:00
Compare commits
8 Commits
test-non-m
...
penalty-no
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a16293bdca | ||
|
|
47990be775 | ||
|
|
8014a848a0 | ||
|
|
87110e551a | ||
|
|
917dbf83cd | ||
|
|
0c61f12b85 | ||
|
|
a6237bd1c1 | ||
|
|
d9a60f339e |
@@ -660,6 +660,7 @@ proc onHeartbeat(g: GossipSub) =
|
||||
g.pruned(peer, t)
|
||||
g.mesh.removePeer(t, peer)
|
||||
prunes &= peer
|
||||
peer.clearNonPriorityQueue()
|
||||
if prunes.len > 0:
|
||||
let prune = RPCMsg(control: some(ControlMessage(
|
||||
prune: @[ControlPrune(
|
||||
|
||||
@@ -35,10 +35,10 @@ when defined(pubsubpeer_queue_metrics):
|
||||
declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"])
|
||||
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"])
|
||||
|
||||
declareCounter(libp2p_pubsub_disconnects_over_non_priority_queue_limit, "number of peers disconnected due to over non-prio queue capacity")
|
||||
|
||||
const
|
||||
DefaultMaxNumElementsInNonPriorityQueue* = 1024
|
||||
BehaviourPenaltyFoNonPriorityQueueOverLimit = 0.0001 # this value is quite arbitrary and was found empirically
|
||||
# to result in a behaviourPenalty around [0.1, 0.2] when the score is updated.
|
||||
|
||||
type
|
||||
PeerRateLimitError* = object of CatchableError
|
||||
@@ -93,7 +93,6 @@ type
|
||||
|
||||
rpcmessagequeue: RpcMessageQueue
|
||||
maxNumElementsInNonPriorityQueue*: int # The max number of elements allowed in the non-priority queue.
|
||||
disconnected: bool
|
||||
|
||||
RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
|
||||
{.gcsafe, raises: [].}
|
||||
@@ -194,7 +193,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||
|
||||
proc closeSendConn(p: PubSubPeer, event: PubSubPeerEventKind) {.async.} =
|
||||
if p.sendConn != nil:
|
||||
trace "Removing send connection", p, conn = p.sendConn
|
||||
debug "Removing send connection", p, conn = p.sendConn
|
||||
await p.sendConn.close()
|
||||
p.sendConn = nil
|
||||
|
||||
@@ -222,7 +221,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
|
||||
# remote peer - if we had multiple channels up and one goes down, all
|
||||
# stop working so we make an effort to only keep a single channel alive
|
||||
|
||||
trace "Get new send connection", p, newConn
|
||||
debug "Get new send connection", p, newConn
|
||||
|
||||
# Careful to race conditions here.
|
||||
# Topic subscription relies on either connectedFut
|
||||
@@ -238,19 +237,15 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
|
||||
finally:
|
||||
await p.closeSendConn(PubSubPeerEventKind.StreamClosed)
|
||||
|
||||
proc connectImpl(p: PubSubPeer) {.async.} =
|
||||
proc connectImpl(peer: PubSubPeer) {.async.} =
|
||||
try:
|
||||
# Keep trying to establish a connection while it's possible to do so - the
|
||||
# send connection might get disconnected due to a timeout or an unrelated
|
||||
# issue so we try to get a new on
|
||||
while true:
|
||||
if p.disconnected:
|
||||
if not p.connectedFut.finished:
|
||||
p.connectedFut.complete()
|
||||
return
|
||||
await connectOnce(p)
|
||||
await connectOnce(peer)
|
||||
except CatchableError as exc: # never cancelled
|
||||
debug "Could not establish send connection", msg = exc.msg
|
||||
debug "Could not establish send connection", peer, msg = exc.msg
|
||||
|
||||
proc connect*(p: PubSubPeer) =
|
||||
if p.connected:
|
||||
@@ -284,6 +279,13 @@ proc clearSendPriorityQueue(p: PubSubPeer) =
|
||||
value = p.rpcmessagequeue.sendPriorityQueue.len.int64,
|
||||
labelValues = [$p.peerId])
|
||||
|
||||
proc clearNonPriorityQueue*(p: PubSubPeer) =
|
||||
if len(p.rpcmessagequeue.nonPriorityQueue) > 0:
|
||||
p.rpcmessagequeue.nonPriorityQueue.clear()
|
||||
|
||||
when defined(pubsubpeer_queue_metrics):
|
||||
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
|
||||
|
||||
proc sendMsgContinue(conn: Connection, msgFut: Future[void]) {.async.} =
|
||||
# Continuation for a pending `sendMsg` future from below
|
||||
try:
|
||||
@@ -348,7 +350,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v
|
||||
p.rpcmessagequeue.nonPriorityQueue.len()) == 0
|
||||
|
||||
if msg.len <= 0:
|
||||
debug "empty message, skipping", p, msg = shortLog(msg)
|
||||
debug "empty message, skipping", peer = p, msg = shortLog(msg)
|
||||
Future[void].completed()
|
||||
elif msg.len > p.maxMessageSize:
|
||||
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
|
||||
@@ -362,12 +364,10 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v
|
||||
f
|
||||
else:
|
||||
if len(p.rpcmessagequeue.nonPriorityQueue) >= p.maxNumElementsInNonPriorityQueue:
|
||||
if not p.disconnected:
|
||||
p.disconnected = true
|
||||
libp2p_pubsub_disconnects_over_non_priority_queue_limit.inc()
|
||||
p.closeSendConn(PubSubPeerEventKind.DisconnectionRequested)
|
||||
else:
|
||||
Future[void].completed()
|
||||
p.behaviourPenalty += BehaviourPenaltyFoNonPriorityQueueOverLimit
|
||||
trace "Peer has reached maxNumElementsInNonPriorityQueue. Discarding message and applying behaviour penalty.", peer = p, score = p.score,
|
||||
behaviourPenalty = p.behaviourPenalty, agent = p.getAgent()
|
||||
Future[void].completed()
|
||||
else:
|
||||
let f = p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
|
||||
when defined(pubsubpeer_queue_metrics):
|
||||
@@ -480,11 +480,9 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) =
|
||||
p.rpcmessagequeue.sendNonPriorityTask.cancelSoon()
|
||||
p.rpcmessagequeue.sendNonPriorityTask = nil
|
||||
p.rpcmessagequeue.sendPriorityQueue.clear()
|
||||
p.rpcmessagequeue.nonPriorityQueue.clear()
|
||||
|
||||
when defined(pubsubpeer_queue_metrics):
|
||||
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
|
||||
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
|
||||
p.clearNonPriorityQueue()
|
||||
|
||||
proc new(T: typedesc[RpcMessageQueue]): T =
|
||||
return T(
|
||||
|
||||
Reference in New Issue
Block a user