mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 22:57:57 -05:00
Compare commits
1 Commits
block6Test
...
limit_forw
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3fb83b8672 |
@@ -381,7 +381,7 @@ proc validateAndRelay(g: GossipSub,
|
||||
|
||||
# In theory, if topics are the same in all messages, we could batch - we'd
|
||||
# also have to be careful to only include validated messages
|
||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
|
||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false, validMsgId = msgId)
|
||||
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
|
||||
for topic in msg.topicIds:
|
||||
if topic notin g.topics: continue
|
||||
@@ -497,6 +497,9 @@ method rpcHandler*(g: GossipSub,
|
||||
|
||||
libp2p_gossipsub_duplicate.inc()
|
||||
|
||||
if msg.data.len > msgId.len * 10: #Dont relay to the peers from which we already received (We just do it for large messages)
|
||||
peer.heDontWants[^1].incl(msgId)
|
||||
|
||||
# onto the next message
|
||||
continue
|
||||
|
||||
|
||||
@@ -138,7 +138,7 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =
|
||||
|
||||
libp2p_pubsub_peers.set(p.peers.len.int64)
|
||||
|
||||
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.raises: [].} =
|
||||
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool, id: MessageId = @[]) {.raises: [].} =
|
||||
## This procedure attempts to send a `msg` (of type `RPCMsg`) to the specified remote peer in the PubSub network.
|
||||
##
|
||||
## Parameters:
|
||||
@@ -150,13 +150,14 @@ proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.rai
|
||||
## priority messages have been sent.
|
||||
|
||||
trace "sending pubsub message to peer", peer, msg = shortLog(msg)
|
||||
asyncSpawn peer.send(msg, p.anonymize, isHighPriority)
|
||||
asyncSpawn peer.send(msg, p.anonymize, isHighPriority, id)
|
||||
|
||||
proc broadcast*(
|
||||
p: PubSub,
|
||||
sendPeers: auto, # Iteratble[PubSubPeer]
|
||||
msg: RPCMsg,
|
||||
isHighPriority: bool) {.raises: [].} =
|
||||
isHighPriority: bool,
|
||||
validMsgId: MessageId = @[]) {.raises: [].} =
|
||||
## This procedure attempts to send a `msg` (of type `RPCMsg`) to a specified group of peers in the PubSub network.
|
||||
##
|
||||
## Parameters:
|
||||
@@ -211,12 +212,12 @@ proc broadcast*(
|
||||
|
||||
if anyIt(sendPeers, it.hasObservers):
|
||||
for peer in sendPeers:
|
||||
p.send(peer, msg, isHighPriority)
|
||||
p.send(peer, msg, isHighPriority, validMsgId)
|
||||
else:
|
||||
# Fast path that only encodes message once
|
||||
let encoded = encodeRpcMsg(msg, p.anonymize)
|
||||
for peer in sendPeers:
|
||||
asyncSpawn peer.sendEncoded(encoded, isHighPriority)
|
||||
asyncSpawn peer.sendEncoded(encoded, isHighPriority, validMsgId)
|
||||
|
||||
proc sendSubs*(p: PubSub,
|
||||
peer: PubSubPeer,
|
||||
|
||||
@@ -21,6 +21,8 @@ import rpc/[messages, message, protobuf],
|
||||
../../protobuf/minprotobuf,
|
||||
../../utility
|
||||
|
||||
#import gossipsub/libp2p_gossipsub_staggerDontWantSave
|
||||
|
||||
export peerid, connection, deques
|
||||
|
||||
logScope:
|
||||
@@ -52,11 +54,15 @@ type
|
||||
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
|
||||
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}
|
||||
|
||||
MessageWithId = object
|
||||
message: seq[byte]
|
||||
msgId: MessageId
|
||||
|
||||
RpcMessageQueue* = ref object
|
||||
# Tracks async tasks for sending high-priority peer-published messages.
|
||||
sendPriorityQueue: Deque[Future[void]]
|
||||
# Queue for lower-priority messages, like "IWANT" replies and relay messages.
|
||||
nonPriorityQueue: AsyncQueue[seq[byte]]
|
||||
nonPriorityQueue: AsyncQueue[MessageWithId]
|
||||
# Task for processing non-priority message queue.
|
||||
sendNonPriorityTask: Future[void]
|
||||
|
||||
@@ -95,6 +101,10 @@ when defined(libp2p_agents_metrics):
|
||||
#so we have to read the parents short agent..
|
||||
p.sendConn.getWrapped().shortAgent
|
||||
|
||||
proc newMessageWithId(msg: seq[byte], id: MessageId): MessageWithId =
|
||||
result.message = msg
|
||||
result.msgId = id
|
||||
|
||||
proc getAgent*(peer: PubSubPeer): string =
|
||||
return
|
||||
when defined(libp2p_agents_metrics):
|
||||
@@ -256,7 +266,7 @@ proc clearSendPriorityQueue(p: PubSubPeer) =
|
||||
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
|
||||
discard p.rpcmessagequeue.sendPriorityQueue.popFirst()
|
||||
|
||||
proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
||||
proc sendMsg(p: PubSubPeer, msg: seq[byte], msgId: MessageId) {.async.} =
|
||||
if p.sendConn == nil:
|
||||
# Wait for a send conn to be setup. `connectOnce` will
|
||||
# complete this even if the sendConn setup failed
|
||||
@@ -269,6 +279,12 @@ proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
||||
|
||||
trace "sending encoded msgs to peer", conn, encoded = shortLog(msg)
|
||||
|
||||
if msgId.len > 0:
|
||||
for dontWants in p.heDontWants:
|
||||
if msgId in dontWants:
|
||||
#libp2p_gossipsub_staggerDontWantSave.inc()
|
||||
trace "Skipped sending msg/dontwant received from peer", conn, encoded = shortLog(msg)
|
||||
return
|
||||
try:
|
||||
await conn.writeLp(msg)
|
||||
trace "sent pubsub message to remote", conn
|
||||
@@ -281,7 +297,7 @@ proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
||||
|
||||
await conn.close() # This will clean up the send connection
|
||||
|
||||
proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool) {.async.} =
|
||||
proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool, validMsgId: MessageId = @[]) {.async.} =
|
||||
## Asynchronously sends an encoded message to a specified `PubSubPeer`.
|
||||
##
|
||||
## Parameters:
|
||||
@@ -302,13 +318,13 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool) {.async.}
|
||||
|
||||
if isHighPriority:
|
||||
p.clearSendPriorityQueue()
|
||||
let f = p.sendMsg(msg)
|
||||
let f = p.sendMsg(msg, validMsgId)
|
||||
if not f.finished:
|
||||
p.rpcmessagequeue.sendPriorityQueue.addLast(f)
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
|
||||
else:
|
||||
await p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
|
||||
await p.rpcmessagequeue.nonPriorityQueue.addLast(newMessageWithId(msg, validMsgId))
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
|
||||
trace "message queued", p, msg = shortLog(msg)
|
||||
@@ -348,7 +364,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
|
||||
else:
|
||||
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||
|
||||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.async.} =
|
||||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool, validMsgId: MessageId = @[]) {.async.} =
|
||||
## Asynchronously sends an `RPCMsg` to a specified `PubSubPeer` with an option for anonymization.
|
||||
##
|
||||
## Parameters:
|
||||
@@ -377,11 +393,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.
|
||||
|
||||
if encoded.len > p.maxMessageSize and msg.messages.len > 1:
|
||||
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
|
||||
await p.sendEncoded(encodedSplitMsg, isHighPriority)
|
||||
await p.sendEncoded(encodedSplitMsg, isHighPriority, validMsgId)
|
||||
else:
|
||||
# If the message size is within limits, send it as is
|
||||
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
|
||||
await p.sendEncoded(encoded, isHighPriority)
|
||||
await p.sendEncoded(encoded, isHighPriority, validMsgId)
|
||||
|
||||
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
|
||||
for sentIHave in p.sentIHaves.mitems():
|
||||
@@ -403,7 +419,7 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
|
||||
await p.rpcmessagequeue.sendPriorityQueue[^1]
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
|
||||
await p.sendMsg(msg)
|
||||
await p.sendMsg(msg.message, msg.msgId)
|
||||
|
||||
proc startSendNonPriorityTask(p: PubSubPeer) =
|
||||
debug "starting sendNonPriorityTask", p
|
||||
@@ -424,7 +440,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) =
|
||||
proc new(T: typedesc[RpcMessageQueue]): T =
|
||||
return T(
|
||||
sendPriorityQueue: initDeque[Future[void]](),
|
||||
nonPriorityQueue: newAsyncQueue[seq[byte]](),
|
||||
nonPriorityQueue: newAsyncQueue[MessageWithId](),
|
||||
)
|
||||
|
||||
proc new*(
|
||||
|
||||
Reference in New Issue
Block a user