Compare commits

...

4 Commits

Author SHA1 Message Date
ufarooqstatus
b85d34d558 GossipSub v2.0 minimal implementation, with D_announce = D-1 2025-05-28 02:42:43 +05:00
ufarooqstatus
cb3bde2a9b corrected dup_during_validation count 2025-05-12 09:21:54 +05:00
ufarooqstatus
b8cab24ef0 rebased to master, stats places, warmup messages added 2025-05-12 09:18:19 +05:00
ufarooqstatus
d72281dc62 added stats counters, still to check message receives from mesh after issuing iwant 2025-05-12 08:32:07 +05:00
7 changed files with 130 additions and 9 deletions

View File

@@ -36,6 +36,32 @@ import ./gossipsub/[types, scoring, behavior], ../../utils/heartbeat
export types, scoring, behavior, pubsub
import std/atomics
const WARMUP_THRESHOLD = 0
var
lma_dup_during_validation: Atomic[uint32] # number of duplicates during 1st message validation
lma_idontwant_saves: Atomic[uint32] # number of Txs saved due to idontwant
lma_duplicate_count: Atomic[uint32] # number of duplicate messages received
lma_iwants_sent: Atomic[uint32] # number of iwant requests sent
lma_iwants_replied: Atomic[uint32] # number of iwant messages that are replied
lma_imreceiving_saves: Atomic[uint32] # number of messages saved due to imreceiving message
lma_unique_receives: Atomic[uint32] # number of unique messages received
lma_mesh_recvs_aftar_iwant: Atomic[uint32] # messages received from mesh, after sending iwant request
lma_warmup_messages: Atomic[uint32] # dont issue idontwant during if < WARMUP_THRESHOLD
lma_dup_during_validation.store(0)
lma_idontwant_saves.store(0)
lma_duplicate_count.store(0)
lma_iwants_sent.store(0)
lma_iwants_replied.store(0)
lma_imreceiving_saves.store(0)
lma_unique_receives.store(0)
lma_mesh_recvs_aftar_iwant.store(0)
lma_warmup_messages.store(0)
export lma_dup_during_validation, lma_idontwant_saves, lma_duplicate_count, lma_iwants_sent,
lma_iwants_replied, lma_imreceiving_saves, lma_unique_receives, lma_mesh_recvs_aftar_iwant
logScope:
topics = "libp2p gossipsub"
@@ -352,6 +378,13 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)
let ineedRequests = g.handleIAnnounce(peer, control.iannounce)
if ineedRequests.messageIDs.len > 0:
respControl.ineed.add(ineedRequests)
let ineedReplies = g.handleINeed(peer, control.ineed)
if ineedReplies.len > 0:
g.send(peer, RPCMsg(messages: ineedReplies), isHighPriority = true)
let
isPruneNotEmpty = respControl.prune.len > 0
isIWantNotEmpty = respControl.iwant.len > 0
@@ -359,6 +392,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
if isPruneNotEmpty or isIWantNotEmpty:
if isIWantNotEmpty:
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
lma_iwants_sent.atomicInc(respControl.iwant.len.uint32)
if isPruneNotEmpty:
for prune in respControl.prune:
@@ -380,6 +414,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
# iwant replies have lower priority
trace "sending iwant reply messages", peer
lma_iwants_replied.atomicInc(messages.len.uint32)
g.send(peer, RPCMsg(messages: messages), isHighPriority = false)
proc sendIDontWant(
@@ -433,15 +468,29 @@ proc validateAndRelay(
toSendPeers.excl(peer)
if isLargeMessage(msg, msgId):
var peersToSendIDontWant = HashSet[PubSubPeer]()
addToSendPeers(peersToSendIDontWant)
g.sendIDontWant(msg, msgId, peersToSendIDontWant)
if lma_warmup_messages.load() < WARMUP_THRESHOLD:
lma_warmup_messages.atomicInc()
if lma_warmup_messages.load() == WARMUP_THRESHOLD:
lma_dup_during_validation.store(0)
lma_idontwant_saves.store(0)
lma_duplicate_count.store(0)
lma_iwants_sent.store(0)
lma_iwants_replied.store(0)
lma_imreceiving_saves.store(0)
lma_unique_receives.store(0)
lma_mesh_recvs_aftar_iwant.store(0)
else:
var peersToSendIDontWant = HashSet[PubSubPeer]()
addToSendPeers(peersToSendIDontWant)
g.sendIDontWant(msg, msgId, peersToSendIDontWant)
let validation = await g.validate(msg)
var seenPeers: HashSet[PubSubPeer]
discard g.validationSeen.pop(saltedId, seenPeers)
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
lma_dup_during_validation.atomicInc(seenPeers.len.uint32)
libp2p_gossipsub_saved_bytes.inc(
(msg.data.len * seenPeers.len).int64, labelValues = ["validation_duplicate"]
)
@@ -484,15 +533,30 @@ proc validateAndRelay(
libp2p_gossipsub_saved_bytes.inc(
msg.data.len.int64, labelValues = ["idontwant"]
)
lma_idontwant_saves.atomicInc()
return true
return false
toSendPeers.exclIfIt(isMsgInIdontWant(it))
var peersList = toSeq(toSendPeers)
g.rng.shuffle(peersList)
if peersList.len > 0:
toSendPeers.excl(peersList[0])
g.broadcast(@[peersList[0]], RPCMsg(messages: @[msg]), isHighPriority = false)
g.broadcast(toSendPeers,
RPCMsg(control: some(ControlMessage(iannounce: @[ControlIHave(messageIDs: @[msgId])]))),
isHighPriority = true)
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
#g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
#trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
if g.knownTopics.contains(topic):
libp2p_pubsub_messages_rebroadcasted.inc(
@@ -616,11 +680,13 @@ method rpcHandler*(
g.rewardDelivered(peer, topic, false, delay)
libp2p_gossipsub_duplicate.inc()
lma_duplicate_count.atomicInc()
# onto the next message
continue
libp2p_gossipsub_received.inc()
lma_unique_receives.atomicInc()
# avoid processing messages we are not interested in
if topic notin g.topics:

View File

@@ -293,8 +293,9 @@ proc handleIHave*(
if not g.hasSeen(g.salt(msgId)):
if peer.iHaveBudget <= 0:
break
elif msgId notin res.messageIDs:
elif msgId notin res.messageIDs and msgId notin g.weSentIneeds and msgId notin g.weSentIhaves:
res.messageIDs.add(msgId)
g.weSentIhaves.incl(msgId)
dec peer.iHaveBudget
trace "requested message via ihave", messageID = msgId
# shuffling res.messageIDs before sending it out to increase the likelihood
@@ -302,6 +303,39 @@ proc handleIHave*(
g.rng.shuffle(res.messageIDs)
return res
proc handleIAnnounce*(
g: GossipSub, peer: PubSubPeer, iannounces: seq[ControlIHave]
): ControlIWant =
var res: ControlIWant
for iannounce in iannounces:
if iannounce.topicID in g.topics:
for msgId in iannounce.messageIDs:
if not g.hasSeen(g.salt(msgId)):
if msgId notin res.messageIDs:
if msgId notin g.weSentIneeds:
res.messageIDs.add(msgId)
g.weSentIneeds.incl(msgId)
return res
proc handleINeed*(
g: GossipSub, peer: PubSubPeer, ineeds: seq[ControlIWant]
): seq[Message] =
var messages: seq[Message]
var requests: seq[MessageId]
for ineed in ineeds:
for mid in ineed.messageIDs:
if mid in peer.weRepliedIneeds:
continue
if mid notin requests:
requests.add(mid)
for request in requests:
let msg = g.mcache.get(request).valueOr:
continue
messages.add(msg)
peer.weRepliedIneeds.incl(request)
return messages
proc handleIDontWant*(g: GossipSub, peer: PubSubPeer, iDontWants: seq[ControlIWant]) =
for dontWant in iDontWants:
for messageId in dontWant.messageIDs:

View File

@@ -190,6 +190,8 @@ type
routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange
heartbeatEvents*: seq[AsyncEvent]
weSentIneeds*: HashSet[MessageId]
weSentIhaves*: HashSet[MessageId]
MeshMetrics* = object # scratch buffers for metrics
otherPeersPerTopicMesh*: int64

View File

@@ -645,7 +645,7 @@ proc init*[PubParams: object | bool](
sign: bool = true,
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
subscriptionValidator: SubscriptionValidator = nil,
maxMessageSize: int = 1024 * 1024,
maxMessageSize: int = 1450 * 1450, #Support upto 2MB
rng: ref HmacDrbgContext = newRng(),
parameters: PubParams = false,
): P {.raises: [InitializationError], public.} =

View File

@@ -112,6 +112,8 @@ type
## IDONTWANT contains unvalidated message id:s which may be long and/or
## expensive to look up, so we apply the same salting to them as during
## unvalidated message processing
heAnnounced*:Deque[HashSet[MessageId]]
weRepliedIneeds*: HashSet[MessageId]
iHaveBudget*: int
pingBudget*: int
maxMessageSize: int
@@ -566,4 +568,5 @@ proc new*(
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.iDontWants.addFirst(default(HashSet[SaltedId]))
result.heAnnounced.addFirst(default(HashSet[MessageId]))
result.startSendNonPriorityTask()

View File

@@ -63,6 +63,8 @@ type
graft*: seq[ControlGraft]
prune*: seq[ControlPrune]
idontwant*: seq[ControlIWant]
iannounce*: seq[ControlIHave]
ineed*: seq[ControlIWant]
ControlIHave* = object
topicID*: string
@@ -173,11 +175,13 @@ proc byteSize(controlPrune: ControlPrune): int =
# 8 bytes for uint64
static:
expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant"])
expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant", "iannounce", "ineed"])
proc byteSize(control: ControlMessage): int =
control.ihave.foldl(a + b.byteSize, 0) + control.iwant.foldl(a + b.byteSize, 0) +
control.graft.foldl(a + b.byteSize, 0) + control.prune.foldl(a + b.byteSize, 0) +
control.idontwant.foldl(a + b.byteSize, 0)
control.idontwant.foldl(a + b.byteSize, 0) +
control.iannounce.foldl(a + b.byteSize, 0) +
control.ineed.foldl(a + b.byteSize, 0)
static:
expectedFields(RPCMsg, @["subscriptions", "messages", "control", "ping", "pong"])

View File

@@ -89,6 +89,10 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
ipb.write(4, prune)
for idontwant in control.idontwant:
ipb.write(5, idontwant)
for iannounce in control.iannounce:
ipb.write(6, iannounce)
for ineed in control.ineed:
ipb.write(7, ineed)
if len(ipb.buffer) > 0:
ipb.finish()
pb.write(field, ipb)
@@ -208,6 +212,8 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inli
var graftpbs: seq[seq[byte]]
var prunepbs: seq[seq[byte]]
var idontwant: seq[seq[byte]]
var iannounce: seq[seq[byte]]
var ineed: seq[seq[byte]]
if ?cpb.getRepeatedField(1, ihavepbs):
for item in ihavepbs:
control.ihave.add(?decodeIHave(initProtoBuffer(item)))
@@ -223,6 +229,12 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inli
if ?cpb.getRepeatedField(5, idontwant):
for item in idontwant:
control.idontwant.add(?decodeIWant(initProtoBuffer(item)))
if ?cpb.getRepeatedField(6, iannounce):
for item in iannounce:
control.iannounce.add(?decodeIHave(initProtoBuffer(item)))
if ?cpb.getRepeatedField(7, ineed):
for item in ineed:
control.ineed.add(?decodeIWant(initProtoBuffer(item)))
trace "decodeControl: message statistics",
graft_count = len(control.graft),
prune_count = len(control.prune),