Compare commits

...

8 Commits

Author SHA1 Message Date
ufarooqstatus
05446734e3 corrected dup_during_validation count 2025-01-04 21:18:25 +05:00
ufarooqstatus
64ef502c87 Merge branch 'master' into lma_merge_imreceiving_iwant_1
updating local branch
2025-01-04 02:13:02 +05:00
ufarooqstatus
b2a75fc25e we make only one iwant request 2024-10-01 02:45:38 +05:00
ufarooqstatus
4b691b6374 set num_finds to 1 2024-09-29 23:07:42 +05:00
ufarooqstatus
8bb6215d8a imreceiving handling merged with iwant optimization 2024-09-27 03:27:12 +05:00
ufarooqstatus
f1b78f6be6 IMReceiving message added 2024-09-27 00:17:18 +05:00
ufarooqstatus
8377eb0362 stats places, warmup messages added 2024-09-24 23:19:49 +05:00
ufarooqstatus
35d1876ad8 added stats counters, still to check message receives from mesh after issuing iwant 2024-09-22 23:31:42 +05:00
6 changed files with 136 additions and 18 deletions

View File

@@ -36,6 +36,32 @@ import ./gossipsub/[types, scoring, behavior], ../../utils/heartbeat
export types, scoring, behavior, pubsub
import std/atomics
const WARMUP_THRESHOLD = 2
var
lma_dup_during_validation: Atomic[uint32] # number of duplicates during 1st message validation
lma_idontwant_saves: Atomic[uint32] # number of Txs saved due to idontwant
lma_duplicate_count: Atomic[uint32] # number of duplicate messages received
lma_iwants_sent: Atomic[uint32] # number of iwant requests sent
lma_iwants_replied: Atomic[uint32] # number of iwant messages that are replied
lma_imreceiving_saves: Atomic[uint32] # number of messages saved due to imreceiving message
lma_unique_receives: Atomic[uint32] # number of unique messages received
lma_mesh_recvs_aftar_iwant: Atomic[uint32] # messages received from mesh, after sending iwant request
lma_warmup_messages: Atomic[uint32] # dont issue idontwant during if < WARMUP_THRESHOLD
lma_dup_during_validation.store(0)
lma_idontwant_saves.store(0)
lma_duplicate_count.store(0)
lma_iwants_sent.store(0)
lma_iwants_replied.store(0)
lma_imreceiving_saves.store(0)
lma_unique_receives.store(0)
lma_mesh_recvs_aftar_iwant.store(0)
lma_warmup_messages.store(0)
export lma_dup_during_validation, lma_idontwant_saves, lma_duplicate_count, lma_iwants_sent,
lma_iwants_replied, lma_imreceiving_saves, lma_unique_receives, lma_mesh_recvs_aftar_iwant
logScope:
topics = "libp2p gossipsub"
@@ -226,6 +252,7 @@ method init*(g: GossipSub) =
g.codecs &= GossipSubCodec_12
g.codecs &= GossipSubCodec_11
g.codecs &= GossipSubCodec_10
g.iwantsRequested = initHashSet[MessageId]()
method onNewPeer*(g: GossipSub, peer: PubSubPeer) =
g.withPeerStats(peer.peerId) do(stats: var PeerStats):
@@ -347,6 +374,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
var respControl: ControlMessage
g.handleIDontWant(peer, control.idontwant)
g.handleIMReceiving(peer, control.imreceiving)
let iwant = g.handleIHave(peer, control.ihave)
if iwant.messageIDs.len > 0:
respControl.iwant.add(iwant)
@@ -360,6 +388,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
if isPruneNotEmpty or isIWantNotEmpty:
if isIWantNotEmpty:
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
lma_iwants_sent.atomicInc(respControl.iwant.len.uint32)
if isPruneNotEmpty:
for prune in respControl.prune:
@@ -381,6 +410,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
# iwant replies have lower priority
trace "sending iwant reply messages", peer
lma_iwants_replied.atomicInc(messages.len.uint32)
g.send(peer, RPCMsg(messages: messages), isHighPriority = false)
proc validateAndRelay(
@@ -397,6 +427,8 @@ proc validateAndRelay(
toSendPeers.incl(peers[])
g.subscribedDirectPeers.withValue(topic, peers):
toSendPeers.incl(peers[])
if not (peer in toSendPeers):
lma_mesh_recvs_aftar_iwant.atomicInc()
toSendPeers.excl(peer)
if msg.data.len > max(512, msgId.len * 10):
@@ -409,25 +441,39 @@ proc validateAndRelay(
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).
var peersToSendIDontWant = HashSet[PubSubPeer]()
addToSendPeers(peersToSendIDontWant)
peersToSendIDontWant.exclIfIt(
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
)
g.broadcast(
peersToSendIDontWant,
RPCMsg(
control:
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)
if lma_warmup_messages.load() < WARMUP_THRESHOLD:
lma_warmup_messages.atomicInc()
if lma_warmup_messages.load() == WARMUP_THRESHOLD:
lma_dup_during_validation.store(0)
lma_idontwant_saves.store(0)
lma_duplicate_count.store(0)
lma_iwants_sent.store(0)
lma_iwants_replied.store(0)
lma_imreceiving_saves.store(0)
lma_unique_receives.store(0)
lma_mesh_recvs_aftar_iwant.store(0)
else:
var peersToSendIDontWant = HashSet[PubSubPeer]()
addToSendPeers(peersToSendIDontWant)
peersToSendIDontWant.exclIfIt(
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
)
g.broadcast(
peersToSendIDontWant,
RPCMsg(
control:
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)
let validation = await g.validate(msg)
var seenPeers: HashSet[PubSubPeer]
discard g.validationSeen.pop(saltedId, seenPeers)
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
lma_dup_during_validation.atomicInc(seenPeers.len.uint32)
libp2p_gossipsub_saved_bytes.inc(
(msg.data.len * seenPeers.len).int64, labelValues = ["validation_duplicate"]
)
@@ -463,6 +509,17 @@ proc validateAndRelay(
# Don't send it to peers that sent it during validation
toSendPeers.excl(seenPeers)
#We have received IMReceiving from these peers, We should not exclude them
#Ideally we should wait (TxTime + large safety cushion) before sending to these peers
var receivingPeers: HashSet[PubSubPeer]
for pr in toSendPeers:
for heIsReceiving in pr.heIsReceivings:
if msgId in heIsReceiving:
receivingPeers.incl(pr)
break
toSendPeers.excl(receivingPeers)
lma_imreceiving_saves.atomicInc(receivingPeers.len.uint32)
proc isMsgInIdontWant(it: PubSubPeer): bool =
for iDontWant in it.iDontWants:
if saltedId in iDontWant:
@@ -470,6 +527,7 @@ proc validateAndRelay(
libp2p_gossipsub_saved_bytes.inc(
msg.data.len.int64, labelValues = ["idontwant"]
)
lma_idontwant_saves.atomicInc()
return true
return false
@@ -596,11 +654,13 @@ method rpcHandler*(g: GossipSub, peer: PubSubPeer, data: seq[byte]) {.async.} =
g.rewardDelivered(peer, topic, false, delay)
libp2p_gossipsub_duplicate.inc()
lma_duplicate_count.atomicInc()
# onto the next message
continue
libp2p_gossipsub_received.inc()
lma_unique_receives.atomicInc()
# avoid processing messages we are not interested in
if topic notin g.topics:

View File

@@ -290,14 +290,31 @@ proc handleIHave*(
for ihave in ihaves:
trace "peer sent ihave", peer, topicID = ihave.topicID, msgs = ihave.messageIDs
if ihave.topicID in g.topics:
#look here for receieved idontwants for the same message
var meshPeers: HashSet[PubSubPeer]
g.mesh.withValue(ihave.topicID, peers): meshPeers.incl(peers[])
g.subscribedDirectPeers.withValue(ihave.topicID, peers): meshPeers.incl(peers[])
for msgId in ihave.messageIDs:
if not g.hasSeen(g.salt(msgId)):
if peer.iHaveBudget <= 0:
break
elif msgId in g.iwantsRequested:
break
elif msgId notin res.messageIDs:
res.messageIDs.add(msgId)
dec peer.iHaveBudget
trace "requested message via ihave", messageID = msgId
#dont send IWANT if we have received (N number of) IDontWant(s) for a msgID
let saltedID = g.salt(msgId)
var numFinds: int = 0
for meshPeer in meshPeers:
for heDontWant in meshPeer.iDontWants:
if saltedID in heDontWant:
numFinds = numFinds + 1
#break;
if numFinds == 0: #We currently wait for 1 IDontWants
res.messageIDs.add(msgId)
dec peer.iHaveBudget
g.iwantsRequested.incl(msgId)
trace "requested message via ihave", messageID = msgId
# shuffling res.messageIDs before sending it out to increase the likelihood
# of getting an answer if the peer truncates the list due to internal size restrictions.
g.rng.shuffle(res.messageIDs)
@@ -309,6 +326,35 @@ proc handleIDontWant*(g: GossipSub, peer: PubSubPeer, iDontWants: seq[ControlIWa
if peer.iDontWants[^1].len > 1000:
break
peer.iDontWants[^1].incl(g.salt(messageId))
#Experimental change for quick performance evaluation only (Ideally for very large messages):
#[
1) IDontWant is followed by the message. IMReceiving informs peers that we are receiving this message
2) Prototype implementation for a single topic ("test"). Need topic ID in IDontWant
3) Better solution is to send Message detail in a message preamble, That can be used for IMReceiving
]#
var toSendPeers = HashSet[PubSubPeer]()
g.floodsub.withValue("test", peers): toSendPeers.incl(peers[])
g.mesh.withValue("test", peers): toSendPeers.incl(peers[])
# add direct peers
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault("test"))
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
imreceiving: @[ControlIWant(messageIDs: @[messageId])]
))), isHighPriority = true)
proc handleIMReceiving*(g: GossipSub,
peer: PubSubPeer,
imreceivings: seq[ControlIWant]) =
for imreceiving in imreceivings:
for messageId in imreceiving.messageIDs:
if peer.heIsReceivings[^1].len > 1000: break
if messageId.len > 100: continue
peer.heIsReceivings[^1].incl(messageId)
proc handleIWant*(
g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]

View File

@@ -188,6 +188,8 @@ type
heartbeatEvents*: seq[AsyncEvent]
iwantsRequested*: HashSet[MessageId]
MeshMetrics* = object # scratch buffers for metrics
otherPeersPerTopicMesh*: int64
otherPeersPerTopicFanout*: int64

View File

@@ -109,6 +109,7 @@ type
## IDONTWANT contains unvalidated message id:s which may be long and/or
## expensive to look up, so we apply the same salting to them as during
## unvalidated message processing
heIsReceivings*:Deque[HashSet[MessageId]]
iHaveBudget*: int
pingBudget*: int
maxMessageSize: int
@@ -557,4 +558,5 @@ proc new*(
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.iDontWants.addFirst(default(HashSet[SaltedId]))
result.heIsReceivings.addFirst(default(HashSet[MessageId]))
result.startSendNonPriorityTask()

View File

@@ -63,6 +63,7 @@ type
graft*: seq[ControlGraft]
prune*: seq[ControlPrune]
idontwant*: seq[ControlIWant]
imreceiving*: seq[ControlIWant]
ControlIHave* = object
topicID*: string
@@ -173,11 +174,12 @@ proc byteSize(controlPrune: ControlPrune): int =
# 8 bytes for uint64
static:
expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant"])
expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant", "imreceiving"])
proc byteSize(control: ControlMessage): int =
control.ihave.foldl(a + b.byteSize, 0) + control.iwant.foldl(a + b.byteSize, 0) +
control.graft.foldl(a + b.byteSize, 0) + control.prune.foldl(a + b.byteSize, 0) +
control.idontwant.foldl(a + b.byteSize, 0)
control.idontwant.foldl(a + b.byteSize, 0) +
control.imreceiving.foldl(a + b.byteSize, 0)
static:
expectedFields(RPCMsg, @["subscriptions", "messages", "control", "ping", "pong"])

View File

@@ -89,6 +89,8 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
ipb.write(4, prune)
for idontwant in control.idontwant:
ipb.write(5, idontwant)
for imreceiving in control.imreceiving:
ipb.write(6, imreceiving)
if len(ipb.buffer) > 0:
ipb.finish()
pb.write(field, ipb)
@@ -208,6 +210,7 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inli
var graftpbs: seq[seq[byte]]
var prunepbs: seq[seq[byte]]
var idontwant: seq[seq[byte]]
var imreceiving: seq[seq[byte]]
if ?cpb.getRepeatedField(1, ihavepbs):
for item in ihavepbs:
control.ihave.add(?decodeIHave(initProtoBuffer(item)))
@@ -223,6 +226,9 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inli
if ?cpb.getRepeatedField(5, idontwant):
for item in idontwant:
control.idontwant.add(?decodeIWant(initProtoBuffer(item)))
if ? cpb.getRepeatedField(6, imreceiving):
for item in imreceiving:
control.imreceiving.add(?decodeIWant(initProtoBuffer(item)))
trace "decodeControl: message statistics",
graft_count = len(control.graft),
prune_count = len(control.prune),