mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 12:58:05 -05:00
Compare commits
4 Commits
more-debug
...
research_g
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b85d34d558 | ||
|
|
cb3bde2a9b | ||
|
|
b8cab24ef0 | ||
|
|
d72281dc62 |
@@ -36,6 +36,32 @@ import ./gossipsub/[types, scoring, behavior], ../../utils/heartbeat
|
||||
|
||||
export types, scoring, behavior, pubsub
|
||||
|
||||
import std/atomics
|
||||
const WARMUP_THRESHOLD = 0
|
||||
var
|
||||
lma_dup_during_validation: Atomic[uint32] # number of duplicates during 1st message validation
|
||||
lma_idontwant_saves: Atomic[uint32] # number of Txs saved due to idontwant
|
||||
lma_duplicate_count: Atomic[uint32] # number of duplicate messages received
|
||||
lma_iwants_sent: Atomic[uint32] # number of iwant requests sent
|
||||
lma_iwants_replied: Atomic[uint32] # number of iwant messages that are replied
|
||||
lma_imreceiving_saves: Atomic[uint32] # number of messages saved due to imreceiving message
|
||||
lma_unique_receives: Atomic[uint32] # number of unique messages received
|
||||
lma_mesh_recvs_aftar_iwant: Atomic[uint32] # messages received from mesh, after sending iwant request
|
||||
lma_warmup_messages: Atomic[uint32] # dont issue idontwant during if < WARMUP_THRESHOLD
|
||||
|
||||
lma_dup_during_validation.store(0)
|
||||
lma_idontwant_saves.store(0)
|
||||
lma_duplicate_count.store(0)
|
||||
lma_iwants_sent.store(0)
|
||||
lma_iwants_replied.store(0)
|
||||
lma_imreceiving_saves.store(0)
|
||||
lma_unique_receives.store(0)
|
||||
lma_mesh_recvs_aftar_iwant.store(0)
|
||||
lma_warmup_messages.store(0)
|
||||
|
||||
export lma_dup_during_validation, lma_idontwant_saves, lma_duplicate_count, lma_iwants_sent,
|
||||
lma_iwants_replied, lma_imreceiving_saves, lma_unique_receives, lma_mesh_recvs_aftar_iwant
|
||||
|
||||
logScope:
|
||||
topics = "libp2p gossipsub"
|
||||
|
||||
@@ -352,6 +378,13 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||
respControl.prune.add(g.handleGraft(peer, control.graft))
|
||||
let messages = g.handleIWant(peer, control.iwant)
|
||||
|
||||
let ineedRequests = g.handleIAnnounce(peer, control.iannounce)
|
||||
if ineedRequests.messageIDs.len > 0:
|
||||
respControl.ineed.add(ineedRequests)
|
||||
let ineedReplies = g.handleINeed(peer, control.ineed)
|
||||
if ineedReplies.len > 0:
|
||||
g.send(peer, RPCMsg(messages: ineedReplies), isHighPriority = true)
|
||||
|
||||
let
|
||||
isPruneNotEmpty = respControl.prune.len > 0
|
||||
isIWantNotEmpty = respControl.iwant.len > 0
|
||||
@@ -359,6 +392,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||
if isPruneNotEmpty or isIWantNotEmpty:
|
||||
if isIWantNotEmpty:
|
||||
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
|
||||
lma_iwants_sent.atomicInc(respControl.iwant.len.uint32)
|
||||
|
||||
if isPruneNotEmpty:
|
||||
for prune in respControl.prune:
|
||||
@@ -380,6 +414,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||
|
||||
# iwant replies have lower priority
|
||||
trace "sending iwant reply messages", peer
|
||||
lma_iwants_replied.atomicInc(messages.len.uint32)
|
||||
g.send(peer, RPCMsg(messages: messages), isHighPriority = false)
|
||||
|
||||
proc sendIDontWant(
|
||||
@@ -433,15 +468,29 @@ proc validateAndRelay(
|
||||
toSendPeers.excl(peer)
|
||||
|
||||
if isLargeMessage(msg, msgId):
|
||||
var peersToSendIDontWant = HashSet[PubSubPeer]()
|
||||
addToSendPeers(peersToSendIDontWant)
|
||||
g.sendIDontWant(msg, msgId, peersToSendIDontWant)
|
||||
if lma_warmup_messages.load() < WARMUP_THRESHOLD:
|
||||
lma_warmup_messages.atomicInc()
|
||||
if lma_warmup_messages.load() == WARMUP_THRESHOLD:
|
||||
lma_dup_during_validation.store(0)
|
||||
lma_idontwant_saves.store(0)
|
||||
lma_duplicate_count.store(0)
|
||||
lma_iwants_sent.store(0)
|
||||
lma_iwants_replied.store(0)
|
||||
lma_imreceiving_saves.store(0)
|
||||
lma_unique_receives.store(0)
|
||||
lma_mesh_recvs_aftar_iwant.store(0)
|
||||
else:
|
||||
var peersToSendIDontWant = HashSet[PubSubPeer]()
|
||||
addToSendPeers(peersToSendIDontWant)
|
||||
g.sendIDontWant(msg, msgId, peersToSendIDontWant)
|
||||
|
||||
|
||||
let validation = await g.validate(msg)
|
||||
|
||||
var seenPeers: HashSet[PubSubPeer]
|
||||
discard g.validationSeen.pop(saltedId, seenPeers)
|
||||
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
|
||||
lma_dup_during_validation.atomicInc(seenPeers.len.uint32)
|
||||
libp2p_gossipsub_saved_bytes.inc(
|
||||
(msg.data.len * seenPeers.len).int64, labelValues = ["validation_duplicate"]
|
||||
)
|
||||
@@ -484,15 +533,30 @@ proc validateAndRelay(
|
||||
libp2p_gossipsub_saved_bytes.inc(
|
||||
msg.data.len.int64, labelValues = ["idontwant"]
|
||||
)
|
||||
lma_idontwant_saves.atomicInc()
|
||||
return true
|
||||
return false
|
||||
|
||||
toSendPeers.exclIfIt(isMsgInIdontWant(it))
|
||||
var peersList = toSeq(toSendPeers)
|
||||
g.rng.shuffle(peersList)
|
||||
|
||||
if peersList.len > 0:
|
||||
toSendPeers.excl(peersList[0])
|
||||
g.broadcast(@[peersList[0]], RPCMsg(messages: @[msg]), isHighPriority = false)
|
||||
|
||||
g.broadcast(toSendPeers,
|
||||
RPCMsg(control: some(ControlMessage(iannounce: @[ControlIHave(messageIDs: @[msgId])]))),
|
||||
isHighPriority = true)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# In theory, if topics are the same in all messages, we could batch - we'd
|
||||
# also have to be careful to only include validated messages
|
||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
|
||||
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
|
||||
#g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
|
||||
#trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
|
||||
|
||||
if g.knownTopics.contains(topic):
|
||||
libp2p_pubsub_messages_rebroadcasted.inc(
|
||||
@@ -616,11 +680,13 @@ method rpcHandler*(
|
||||
g.rewardDelivered(peer, topic, false, delay)
|
||||
|
||||
libp2p_gossipsub_duplicate.inc()
|
||||
lma_duplicate_count.atomicInc()
|
||||
|
||||
# onto the next message
|
||||
continue
|
||||
|
||||
libp2p_gossipsub_received.inc()
|
||||
lma_unique_receives.atomicInc()
|
||||
|
||||
# avoid processing messages we are not interested in
|
||||
if topic notin g.topics:
|
||||
|
||||
@@ -293,8 +293,9 @@ proc handleIHave*(
|
||||
if not g.hasSeen(g.salt(msgId)):
|
||||
if peer.iHaveBudget <= 0:
|
||||
break
|
||||
elif msgId notin res.messageIDs:
|
||||
elif msgId notin res.messageIDs and msgId notin g.weSentIneeds and msgId notin g.weSentIhaves:
|
||||
res.messageIDs.add(msgId)
|
||||
g.weSentIhaves.incl(msgId)
|
||||
dec peer.iHaveBudget
|
||||
trace "requested message via ihave", messageID = msgId
|
||||
# shuffling res.messageIDs before sending it out to increase the likelihood
|
||||
@@ -302,6 +303,39 @@ proc handleIHave*(
|
||||
g.rng.shuffle(res.messageIDs)
|
||||
return res
|
||||
|
||||
proc handleIAnnounce*(
|
||||
g: GossipSub, peer: PubSubPeer, iannounces: seq[ControlIHave]
|
||||
): ControlIWant =
|
||||
var res: ControlIWant
|
||||
for iannounce in iannounces:
|
||||
if iannounce.topicID in g.topics:
|
||||
for msgId in iannounce.messageIDs:
|
||||
if not g.hasSeen(g.salt(msgId)):
|
||||
if msgId notin res.messageIDs:
|
||||
if msgId notin g.weSentIneeds:
|
||||
res.messageIDs.add(msgId)
|
||||
g.weSentIneeds.incl(msgId)
|
||||
return res
|
||||
|
||||
proc handleINeed*(
|
||||
g: GossipSub, peer: PubSubPeer, ineeds: seq[ControlIWant]
|
||||
): seq[Message] =
|
||||
var messages: seq[Message]
|
||||
var requests: seq[MessageId]
|
||||
for ineed in ineeds:
|
||||
for mid in ineed.messageIDs:
|
||||
if mid in peer.weRepliedIneeds:
|
||||
continue
|
||||
if mid notin requests:
|
||||
requests.add(mid)
|
||||
for request in requests:
|
||||
let msg = g.mcache.get(request).valueOr:
|
||||
continue
|
||||
messages.add(msg)
|
||||
peer.weRepliedIneeds.incl(request)
|
||||
return messages
|
||||
|
||||
|
||||
proc handleIDontWant*(g: GossipSub, peer: PubSubPeer, iDontWants: seq[ControlIWant]) =
|
||||
for dontWant in iDontWants:
|
||||
for messageId in dontWant.messageIDs:
|
||||
|
||||
@@ -190,6 +190,8 @@ type
|
||||
routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange
|
||||
|
||||
heartbeatEvents*: seq[AsyncEvent]
|
||||
weSentIneeds*: HashSet[MessageId]
|
||||
weSentIhaves*: HashSet[MessageId]
|
||||
|
||||
MeshMetrics* = object # scratch buffers for metrics
|
||||
otherPeersPerTopicMesh*: int64
|
||||
|
||||
@@ -645,7 +645,7 @@ proc init*[PubParams: object | bool](
|
||||
sign: bool = true,
|
||||
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
|
||||
subscriptionValidator: SubscriptionValidator = nil,
|
||||
maxMessageSize: int = 1024 * 1024,
|
||||
maxMessageSize: int = 1450 * 1450, #Support upto 2MB
|
||||
rng: ref HmacDrbgContext = newRng(),
|
||||
parameters: PubParams = false,
|
||||
): P {.raises: [InitializationError], public.} =
|
||||
|
||||
@@ -112,6 +112,8 @@ type
|
||||
## IDONTWANT contains unvalidated message id:s which may be long and/or
|
||||
## expensive to look up, so we apply the same salting to them as during
|
||||
## unvalidated message processing
|
||||
heAnnounced*:Deque[HashSet[MessageId]]
|
||||
weRepliedIneeds*: HashSet[MessageId]
|
||||
iHaveBudget*: int
|
||||
pingBudget*: int
|
||||
maxMessageSize: int
|
||||
@@ -566,4 +568,5 @@ proc new*(
|
||||
)
|
||||
result.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||
result.iDontWants.addFirst(default(HashSet[SaltedId]))
|
||||
result.heAnnounced.addFirst(default(HashSet[MessageId]))
|
||||
result.startSendNonPriorityTask()
|
||||
|
||||
@@ -63,6 +63,8 @@ type
|
||||
graft*: seq[ControlGraft]
|
||||
prune*: seq[ControlPrune]
|
||||
idontwant*: seq[ControlIWant]
|
||||
iannounce*: seq[ControlIHave]
|
||||
ineed*: seq[ControlIWant]
|
||||
|
||||
ControlIHave* = object
|
||||
topicID*: string
|
||||
@@ -173,11 +175,13 @@ proc byteSize(controlPrune: ControlPrune): int =
|
||||
# 8 bytes for uint64
|
||||
|
||||
static:
|
||||
expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant"])
|
||||
expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant", "iannounce", "ineed"])
|
||||
proc byteSize(control: ControlMessage): int =
|
||||
control.ihave.foldl(a + b.byteSize, 0) + control.iwant.foldl(a + b.byteSize, 0) +
|
||||
control.graft.foldl(a + b.byteSize, 0) + control.prune.foldl(a + b.byteSize, 0) +
|
||||
control.idontwant.foldl(a + b.byteSize, 0)
|
||||
control.idontwant.foldl(a + b.byteSize, 0) +
|
||||
control.iannounce.foldl(a + b.byteSize, 0) +
|
||||
control.ineed.foldl(a + b.byteSize, 0)
|
||||
|
||||
static:
|
||||
expectedFields(RPCMsg, @["subscriptions", "messages", "control", "ping", "pong"])
|
||||
|
||||
@@ -89,6 +89,10 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
|
||||
ipb.write(4, prune)
|
||||
for idontwant in control.idontwant:
|
||||
ipb.write(5, idontwant)
|
||||
for iannounce in control.iannounce:
|
||||
ipb.write(6, iannounce)
|
||||
for ineed in control.ineed:
|
||||
ipb.write(7, ineed)
|
||||
if len(ipb.buffer) > 0:
|
||||
ipb.finish()
|
||||
pb.write(field, ipb)
|
||||
@@ -208,6 +212,8 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inli
|
||||
var graftpbs: seq[seq[byte]]
|
||||
var prunepbs: seq[seq[byte]]
|
||||
var idontwant: seq[seq[byte]]
|
||||
var iannounce: seq[seq[byte]]
|
||||
var ineed: seq[seq[byte]]
|
||||
if ?cpb.getRepeatedField(1, ihavepbs):
|
||||
for item in ihavepbs:
|
||||
control.ihave.add(?decodeIHave(initProtoBuffer(item)))
|
||||
@@ -223,6 +229,12 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inli
|
||||
if ?cpb.getRepeatedField(5, idontwant):
|
||||
for item in idontwant:
|
||||
control.idontwant.add(?decodeIWant(initProtoBuffer(item)))
|
||||
if ?cpb.getRepeatedField(6, iannounce):
|
||||
for item in iannounce:
|
||||
control.iannounce.add(?decodeIHave(initProtoBuffer(item)))
|
||||
if ?cpb.getRepeatedField(7, ineed):
|
||||
for item in ineed:
|
||||
control.ineed.add(?decodeIWant(initProtoBuffer(item)))
|
||||
trace "decodeControl: message statistics",
|
||||
graft_count = len(control.graft),
|
||||
prune_count = len(control.prune),
|
||||
|
||||
Reference in New Issue
Block a user