mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-09 14:28:11 -05:00
Compare commits
2 Commits
v1.7.0
...
staggered_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a478b8f4d8 | ||
|
|
7d477732ad |
@@ -432,10 +432,19 @@ proc validateAndRelay(g: GossipSub,
|
||||
break
|
||||
toSendPeers.excl(peersWhoSentIdontwant) # avoids len(s) == length` the length of the HashSet changed while iterating over it [AssertionDefect]
|
||||
|
||||
#We first send to the outbound peers to avoid peers sending same message to each other
|
||||
var outboundPeers: seq[PubSubPeer]
|
||||
for mpeer in toSendPeers:
|
||||
if mpeer.outbound():
|
||||
outboundPeers.add(mpeer)
|
||||
if outboundPeers.len > 0:
|
||||
g.broadcast(outboundPeers, RPCMsg(messages: @[msg]), isHighPriority = false, some(saltedId))
|
||||
toSendPeers.excl(outboundPeers.toHashSet)
|
||||
|
||||
# In theory, if topics are the same in all messages, we could batch - we'd
|
||||
# also have to be careful to only include validated messages
|
||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
|
||||
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
|
||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false, some(saltedId))
|
||||
trace "forwarded message to peers", peers = toSendPeers.len + outboundPeers.len, msgId, peer
|
||||
|
||||
if g.knownTopics.contains(topic):
|
||||
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic])
|
||||
@@ -544,6 +553,11 @@ method rpcHandler*(g: GossipSub,
|
||||
|
||||
libp2p_gossipsub_duplicate.inc()
|
||||
|
||||
#Dont relay to the peers from which we already received
|
||||
#We do it for large messages only
|
||||
if msg.data.len > msgId.len * 10:
|
||||
peer.heDontWants[^1].incl(msgIdSalted)
|
||||
|
||||
# onto the next message
|
||||
continue
|
||||
|
||||
@@ -715,15 +729,23 @@ method publish*(g: GossipSub,
|
||||
|
||||
g.mcache.put(msgId, msg)
|
||||
|
||||
#We first send to the outbound peers
|
||||
var outboundPeers: seq[PubSubPeer]
|
||||
for mpeer in peers:
|
||||
if mpeer.outbound():
|
||||
outboundPeers.add(mpeer)
|
||||
if outboundPeers.len > 0:
|
||||
g.broadcast(outboundPeers, RPCMsg(messages: @[msg]), isHighPriority = true)
|
||||
peers.excl(outboundPeers.toHashSet)
|
||||
g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)
|
||||
|
||||
if g.knownTopics.contains(topic):
|
||||
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])
|
||||
libp2p_pubsub_messages_published.inc( (peers.len + outboundPeers.len).int64, labelValues = [topic])
|
||||
else:
|
||||
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"])
|
||||
libp2p_pubsub_messages_published.inc( (peers.len + outboundPeers.len).int64, labelValues = ["generic"])
|
||||
|
||||
trace "Published message to peers", peers=peers.len
|
||||
return peers.len
|
||||
trace "Published message to peers", peers=peers.len + outboundPeers.len
|
||||
return (peers.len + outboundPeers.len)
|
||||
|
||||
proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} =
|
||||
if id notin g.peers:
|
||||
|
||||
@@ -28,7 +28,8 @@ import ./errors as pubsub_errors,
|
||||
../../peerid,
|
||||
../../peerinfo,
|
||||
../../errors,
|
||||
../../utility
|
||||
../../utility,
|
||||
../../utils/semaphore
|
||||
|
||||
import stew/results
|
||||
export results
|
||||
@@ -79,6 +80,9 @@ declarePublicCounter(libp2p_pubsub_received_ihave, "pubsub broadcast ihave", lab
|
||||
declarePublicCounter(libp2p_pubsub_received_graft, "pubsub broadcast graft", labels = ["topic"])
|
||||
declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", labels = ["topic"])
|
||||
|
||||
const
|
||||
DefaultMaxSimultaneousTx* = 2
|
||||
|
||||
type
|
||||
InitializationError* = object of LPError
|
||||
|
||||
@@ -127,6 +131,7 @@ type
|
||||
rng*: ref HmacDrbgContext
|
||||
|
||||
knownTopics*: HashSet[string]
|
||||
semTxLimit: AsyncSemaphore
|
||||
|
||||
method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =
|
||||
## handle peer disconnects
|
||||
@@ -137,7 +142,7 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =
|
||||
|
||||
libp2p_pubsub_peers.set(p.peers.len.int64)
|
||||
|
||||
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.raises: [].} =
|
||||
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool, saltedId: Option[SaltedId] = none(SaltedId)) {.raises: [].} =
|
||||
## This procedure attempts to send a `msg` (of type `RPCMsg`) to the specified remote peer in the PubSub network.
|
||||
##
|
||||
## Parameters:
|
||||
@@ -149,13 +154,14 @@ proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.rai
|
||||
## priority messages have been sent.
|
||||
|
||||
trace "sending pubsub message to peer", peer, msg = shortLog(msg)
|
||||
peer.send(msg, p.anonymize, isHighPriority)
|
||||
peer.send(msg, p.anonymize, isHighPriority, saltedId)
|
||||
|
||||
proc broadcast*(
|
||||
p: PubSub,
|
||||
sendPeers: auto, # Iteratble[PubSubPeer]
|
||||
msg: RPCMsg,
|
||||
isHighPriority: bool) {.raises: [].} =
|
||||
isHighPriority: bool,
|
||||
sid: Option[SaltedId] = none(SaltedId)) {.raises: [].} =
|
||||
## This procedure attempts to send a `msg` (of type `RPCMsg`) to a specified group of peers in the PubSub network.
|
||||
##
|
||||
## Parameters:
|
||||
@@ -210,12 +216,12 @@ proc broadcast*(
|
||||
|
||||
if anyIt(sendPeers, it.hasObservers):
|
||||
for peer in sendPeers:
|
||||
p.send(peer, msg, isHighPriority)
|
||||
p.send(peer, msg, isHighPriority, sid)
|
||||
else:
|
||||
# Fast path that only encodes message once
|
||||
let encoded = encodeRpcMsg(msg, p.anonymize)
|
||||
for peer in sendPeers:
|
||||
asyncSpawn peer.sendEncoded(encoded, isHighPriority)
|
||||
asyncSpawn peer.sendEncoded(encoded, isHighPriority, sid)
|
||||
|
||||
proc sendSubs*(p: PubSub,
|
||||
peer: PubSubPeer,
|
||||
@@ -310,7 +316,7 @@ method getOrCreatePeer*(
|
||||
p.onPubSubPeerEvent(peer, event)
|
||||
|
||||
# create new pubsub peer
|
||||
let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize)
|
||||
let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize, addr p.semTxLimit)
|
||||
debug "created new pubsub peer", peerId
|
||||
|
||||
p.peers[peerId] = pubSubPeer
|
||||
@@ -509,6 +515,7 @@ method initPubSub*(p: PubSub)
|
||||
p.observers = new(seq[PubSubObserver])
|
||||
if p.msgIdProvider == nil:
|
||||
p.msgIdProvider = defaultMsgIdProvider
|
||||
p.semTxLimit = newAsyncSemaphore(DefaultMaxSimultaneousTx)
|
||||
|
||||
method addValidator*(p: PubSub,
|
||||
topic: varargs[string],
|
||||
|
||||
@@ -19,7 +19,9 @@ import rpc/[messages, message, protobuf],
|
||||
../../stream/connection,
|
||||
../../crypto/crypto,
|
||||
../../protobuf/minprotobuf,
|
||||
../../utility
|
||||
../../utility,
|
||||
../../utils/semaphore
|
||||
import atomics
|
||||
|
||||
export peerid, connection, deques
|
||||
|
||||
@@ -37,6 +39,12 @@ when defined(pubsubpeer_queue_metrics):
|
||||
|
||||
declareCounter(libp2p_pubsub_disconnects_over_non_priority_queue_limit, "number of peers disconnected due to over non-prio queue capacity")
|
||||
|
||||
var
|
||||
libp2p_gossipsub_staggerDontWantSave2: Atomic[int]
|
||||
libp2p_gossipsub_staggerDontWantSave3: Atomic[int]
|
||||
|
||||
export libp2p_gossipsub_staggerDontWantSave2, libp2p_gossipsub_staggerDontWantSave3
|
||||
|
||||
const
|
||||
DefaultMaxNumElementsInNonPriorityQueue* = 1024
|
||||
|
||||
@@ -59,11 +67,15 @@ type
|
||||
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
|
||||
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}
|
||||
|
||||
MessageWithSaltedId = object
|
||||
message: seq[byte]
|
||||
sid: SaltedId
|
||||
|
||||
RpcMessageQueue* = ref object
|
||||
# Tracks async tasks for sending high-priority peer-published messages.
|
||||
sendPriorityQueue: Deque[Future[void]]
|
||||
# Queue for lower-priority messages, like "IWANT" replies and relay messages.
|
||||
nonPriorityQueue: AsyncQueue[seq[byte]]
|
||||
nonPriorityQueue: AsyncQueue[MessageWithSaltedId]
|
||||
# Task for processing non-priority message queue.
|
||||
sendNonPriorityTask: Future[void]
|
||||
|
||||
@@ -91,6 +103,7 @@ type
|
||||
behaviourPenalty*: float64 # the eventual penalty score
|
||||
overheadRateLimitOpt*: Opt[TokenBucket]
|
||||
|
||||
semTxLimit: ptr AsyncSemaphore #Control Max simultaneous transmissions to speed up indivisual receptions
|
||||
rpcmessagequeue: RpcMessageQueue
|
||||
maxNumElementsInNonPriorityQueue*: int # The max number of elements allowed in the non-priority queue.
|
||||
disconnected: bool
|
||||
@@ -107,6 +120,10 @@ when defined(libp2p_agents_metrics):
|
||||
#so we have to read the parents short agent..
|
||||
p.sendConn.getWrapped().shortAgent
|
||||
|
||||
proc newMessageWithSaltedId(msg: seq[byte], saltedId: SaltedId): MessageWithSaltedId =
|
||||
result.message = msg
|
||||
result.sid = saltedId
|
||||
|
||||
proc getAgent*(peer: PubSubPeer): string =
|
||||
return
|
||||
when defined(libp2p_agents_metrics):
|
||||
@@ -311,24 +328,49 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
||||
debug "No send connection", p, msg = shortLog(msg)
|
||||
return
|
||||
|
||||
trace "sending encoded msg to peer", conn, encoded = shortLog(msg)
|
||||
await sendMsgContinue(conn, conn.writeLp(msg))
|
||||
|
||||
proc sendMsg(p: PubSubPeer, msg: seq[byte]): Future[void] =
|
||||
proc sendMsg(p: PubSubPeer, msg: seq[byte], saltedId: Option[SaltedId] = none(SaltedId)): Future[void] {.async.}=
|
||||
if p.sendConn == nil or p.sendConn.closed():
|
||||
await sendMsgSlow(p, msg)
|
||||
|
||||
if p.sendConn != nil and not p.sendConn.closed():
|
||||
# Fast path that avoids copying msg (which happens for {.async.})
|
||||
let conn = p.sendConn
|
||||
|
||||
trace "sending encoded msg to peer", conn, encoded = shortLog(msg)
|
||||
let f = conn.writeLp(msg)
|
||||
if not f.completed():
|
||||
sendMsgContinue(conn, f)
|
||||
else:
|
||||
f
|
||||
else:
|
||||
sendMsgSlow(p, msg)
|
||||
|
||||
proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[void] =
|
||||
if msg.len < 2000:
|
||||
try:
|
||||
let f = conn.writeLp(msg)
|
||||
await f
|
||||
except:
|
||||
await conn.close()
|
||||
else:
|
||||
await p.semTxLimit[].acquire()
|
||||
#We may have received DontWant for the message
|
||||
if saltedId.isSome:
|
||||
for heDontWant in p.heDontWants:
|
||||
if saltedId.get in heDontWant:
|
||||
p.semTxLimit[].release()
|
||||
atomicInc(libp2p_gossipsub_staggerDontWantSave2)
|
||||
return
|
||||
try:
|
||||
let f = conn.writeLp(msg)
|
||||
let turns = (msg.len div 100_000) + 1
|
||||
for i in 1..turns:
|
||||
await f or sleepAsync(200.milliseconds) #sleep time should be adaptive to the peer bandwidth
|
||||
if not f.completed and saltedId.isSome:
|
||||
for heDontWant in p.heDontWants:
|
||||
if saltedId.get in heDontWant:
|
||||
atomicInc(libp2p_gossipsub_staggerDontWantSave3)
|
||||
break
|
||||
if not f.completed:
|
||||
await f.cancelAndWait()
|
||||
#asyncSpawn sendMsgContinue(conn, f)
|
||||
p.semTxLimit[].release()
|
||||
|
||||
except LPStreamError as exc:
|
||||
p.semTxLimit[].release()
|
||||
await conn.close()
|
||||
|
||||
proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool, saltedId: Option[SaltedId] = none(SaltedId)): Future[void] =
|
||||
## Asynchronously sends an encoded message to a specified `PubSubPeer`.
|
||||
##
|
||||
## Parameters:
|
||||
@@ -354,7 +396,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v
|
||||
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
|
||||
Future[void].completed()
|
||||
elif isHighPriority or emptyQueues:
|
||||
let f = p.sendMsg(msg)
|
||||
let f = p.sendMsg(msg, saltedId)
|
||||
if not f.finished:
|
||||
p.rpcmessagequeue.sendPriorityQueue.addLast(f)
|
||||
when defined(pubsubpeer_queue_metrics):
|
||||
@@ -369,10 +411,13 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v
|
||||
else:
|
||||
Future[void].completed()
|
||||
else:
|
||||
let f = p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
|
||||
when defined(pubsubpeer_queue_metrics):
|
||||
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
|
||||
f
|
||||
if not saltedId.isSome:
|
||||
Future[void].completed()
|
||||
else:
|
||||
let f = p.rpcmessagequeue.nonPriorityQueue.addLast(newMessageWithSaltedId(msg, saltedId.get))
|
||||
when defined(pubsubpeer_queue_metrics):
|
||||
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
|
||||
f
|
||||
|
||||
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
|
||||
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
|
||||
@@ -409,7 +454,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
|
||||
else:
|
||||
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||
|
||||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.raises: [].} =
|
||||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool, saltedId: Option[SaltedId] = none(SaltedId)) {.raises: [].} =
|
||||
## Asynchronously sends an `RPCMsg` to a specified `PubSubPeer` with an option for anonymization.
|
||||
##
|
||||
## Parameters:
|
||||
@@ -438,11 +483,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.
|
||||
|
||||
if encoded.len > p.maxMessageSize and msg.messages.len > 1:
|
||||
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
|
||||
asyncSpawn p.sendEncoded(encodedSplitMsg, isHighPriority)
|
||||
asyncSpawn p.sendEncoded(encodedSplitMsg, isHighPriority, saltedId)
|
||||
else:
|
||||
# If the message size is within limits, send it as is
|
||||
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
|
||||
asyncSpawn p.sendEncoded(encoded, isHighPriority)
|
||||
asyncSpawn p.sendEncoded(encoded, isHighPriority, saltedId)
|
||||
|
||||
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
|
||||
for sentIHave in p.sentIHaves.mitems():
|
||||
@@ -467,7 +512,8 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
|
||||
discard await race(p.rpcmessagequeue.sendPriorityQueue[^1])
|
||||
when defined(pubsubpeer_queue_metrics):
|
||||
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
|
||||
await p.sendMsg(msg)
|
||||
await p.sendMsg(msg.message, some(msg.sid))
|
||||
#asyncSpawn p.sendMsg(msg.message, some(msg.sid))
|
||||
|
||||
proc startSendNonPriorityTask(p: PubSubPeer) =
|
||||
debug "starting sendNonPriorityTask", p
|
||||
@@ -489,7 +535,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) =
|
||||
proc new(T: typedesc[RpcMessageQueue]): T =
|
||||
return T(
|
||||
sendPriorityQueue: initDeque[Future[void]](),
|
||||
nonPriorityQueue: newAsyncQueue[seq[byte]]()
|
||||
nonPriorityQueue: newAsyncQueue[MessageWithSaltedId]()
|
||||
)
|
||||
|
||||
proc new*(
|
||||
@@ -499,6 +545,7 @@ proc new*(
|
||||
onEvent: OnEvent,
|
||||
codec: string,
|
||||
maxMessageSize: int,
|
||||
sem: ptr AsyncSemaphore,
|
||||
maxNumElementsInNonPriorityQueue: int = DefaultMaxNumElementsInNonPriorityQueue,
|
||||
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T =
|
||||
|
||||
@@ -516,3 +563,4 @@ proc new*(
|
||||
result.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||
result.heDontWants.addFirst(default(HashSet[SaltedId]))
|
||||
result.startSendNonPriorityTask()
|
||||
result.semTxLimit = sem
|
||||
|
||||
Reference in New Issue
Block a user