Compare commits

...

15 Commits

Author SHA1 Message Date
ufarooqstatus
ac9e1e603c some coments added 2025-01-10 17:29:35 +05:00
ufarooqstatus
e779185b0c preamble added in IWANT replies also 2025-01-07 00:17:02 +05:00
ufarooqstatus
0acd8be302 topicID added in preamble 2025-01-06 17:54:50 +05:00
ufarooqstatus
58470a78e4 sem is one (sequential send in staggering) 2025-01-06 02:15:34 +05:00
ufarooqstatus
43f1f0abfb Added control preamble, and staggering. Still need to add preamble for iwants 2025-01-05 18:52:11 +05:00
ufarooqstatus
0677ab4952 stagger send during publish and validateAndRelay. Still need to add timeouts 2025-01-05 15:05:31 +05:00
ufarooqstatus
ab0feb3772 staggering added in message relay, still need to adjust timeouts 2025-01-05 02:30:11 +05:00
ufarooqstatus
1fe2bae234 corrected dup_during_validation count 2025-01-04 20:44:07 +05:00
ufarooqstatus
64ef502c87 Merge branch 'master' into lma_merge_imreceiving_iwant_1
updating local branch
2025-01-04 02:13:02 +05:00
ufarooqstatus
b2a75fc25e we make only one iwant request 2024-10-01 02:45:38 +05:00
ufarooqstatus
4b691b6374 set num_finds to 1 2024-09-29 23:07:42 +05:00
ufarooqstatus
8bb6215d8a imreceiving handling merged with iwant optimization 2024-09-27 03:27:12 +05:00
ufarooqstatus
f1b78f6be6 IMReceiving message added 2024-09-27 00:17:18 +05:00
ufarooqstatus
8377eb0362 stats places, warmup messages added 2024-09-24 23:19:49 +05:00
ufarooqstatus
35d1876ad8 added stats counters, still to check message receives from mesh after issuing iwant 2024-09-22 23:31:42 +05:00
6 changed files with 212 additions and 26 deletions

View File

@@ -24,6 +24,7 @@ import
./rpc/[messages, message, protobuf],
../protocol,
../../stream/connection,
../../utils/semaphore,
../../peerinfo,
../../peerid,
../../utility,
@@ -36,6 +37,32 @@ import ./gossipsub/[types, scoring, behavior], ../../utils/heartbeat
export types, scoring, behavior, pubsub
import std/atomics
const WARMUP_THRESHOLD = 2
var
lma_dup_during_validation: Atomic[uint32] # number of duplicates during 1st message validation
lma_idontwant_saves: Atomic[uint32] # number of Txs saved due to idontwant
lma_duplicate_count: Atomic[uint32] # number of duplicate messages received
lma_iwants_sent: Atomic[uint32] # number of iwant requests sent
lma_iwants_replied: Atomic[uint32] # number of iwant messages that are replied
lma_imreceiving_saves: Atomic[uint32] # number of messages saved due to imreceiving message
lma_unique_receives: Atomic[uint32] # number of unique messages received
lma_mesh_recvs_aftar_iwant: Atomic[uint32] # messages received from mesh, after sending iwant request
lma_warmup_messages: Atomic[uint32] # dont issue idontwant during if < WARMUP_THRESHOLD
lma_dup_during_validation.store(0)
lma_idontwant_saves.store(0)
lma_duplicate_count.store(0)
lma_iwants_sent.store(0)
lma_iwants_replied.store(0)
lma_imreceiving_saves.store(0)
lma_unique_receives.store(0)
lma_mesh_recvs_aftar_iwant.store(0)
lma_warmup_messages.store(0)
export lma_dup_during_validation, lma_idontwant_saves, lma_duplicate_count, lma_iwants_sent,
lma_iwants_replied, lma_imreceiving_saves, lma_unique_receives, lma_mesh_recvs_aftar_iwant
logScope:
topics = "libp2p gossipsub"
@@ -226,6 +253,7 @@ method init*(g: GossipSub) =
g.codecs &= GossipSubCodec_12
g.codecs &= GossipSubCodec_11
g.codecs &= GossipSubCodec_10
g.iwantsRequested = initHashSet[MessageId]()
method onNewPeer*(g: GossipSub, peer: PubSubPeer) =
g.withPeerStats(peer.peerId) do(stats: var PeerStats):
@@ -347,11 +375,13 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
var respControl: ControlMessage
g.handleIDontWant(peer, control.idontwant)
g.handlePreamble(peer, control.preamble)
g.handleIMReceiving(peer, control.imreceiving)
let iwant = g.handleIHave(peer, control.ihave)
if iwant.messageIDs.len > 0:
respControl.iwant.add(iwant)
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)
let (messages, msgIDs) = g.handleIWant(peer, control.iwant)
let
isPruneNotEmpty = respControl.prune.len > 0
@@ -360,6 +390,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
if isPruneNotEmpty or isIWantNotEmpty:
if isIWantNotEmpty:
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
lma_iwants_sent.atomicInc(respControl.iwant.len.uint32)
if isPruneNotEmpty:
for prune in respControl.prune:
@@ -376,11 +407,17 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
let topic = smsg.topic
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
#We send preamble first, so the peers sends IMReceiving to mesh members
g.broadcast(@[peer], RPCMsg(control: some(ControlMessage(
preamble: @[ControlIHave(topicID: topic, messageIDs: msgIDs)]
))), isHighPriority = true)
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])
# iwant replies have lower priority
trace "sending iwant reply messages", peer
lma_iwants_replied.atomicInc(messages.len.uint32)
g.send(peer, RPCMsg(messages: messages), isHighPriority = false)
proc validateAndRelay(
@@ -397,6 +434,8 @@ proc validateAndRelay(
toSendPeers.incl(peers[])
g.subscribedDirectPeers.withValue(topic, peers):
toSendPeers.incl(peers[])
if not (peer in toSendPeers):
lma_mesh_recvs_aftar_iwant.atomicInc()
toSendPeers.excl(peer)
if msg.data.len > max(512, msgId.len * 10):
@@ -409,25 +448,41 @@ proc validateAndRelay(
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).
var peersToSendIDontWant = HashSet[PubSubPeer]()
addToSendPeers(peersToSendIDontWant)
peersToSendIDontWant.exclIfIt(
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
)
g.broadcast(
peersToSendIDontWant,
RPCMsg(
control:
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)
#We dont consider first WARMUP_THRESHOLD messages in stats (They are for raising Cwnd)
if lma_warmup_messages.load() < WARMUP_THRESHOLD:
lma_warmup_messages.atomicInc()
if lma_warmup_messages.load() == WARMUP_THRESHOLD:
lma_dup_during_validation.store(0)
lma_idontwant_saves.store(0)
lma_duplicate_count.store(0)
lma_iwants_sent.store(0)
lma_iwants_replied.store(0)
lma_imreceiving_saves.store(0)
lma_unique_receives.store(0)
lma_mesh_recvs_aftar_iwant.store(0)
else:
var peersToSendIDontWant = HashSet[PubSubPeer]()
addToSendPeers(peersToSendIDontWant)
peersToSendIDontWant.exclIfIt(
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
)
g.broadcast(
peersToSendIDontWant,
RPCMsg(
control:
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)
let validation = await g.validate(msg)
var seenPeers: HashSet[PubSubPeer]
discard g.validationSeen.pop(saltedId, seenPeers)
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
lma_dup_during_validation.atomicInc(seenPeers.len.uint32)
libp2p_gossipsub_saved_bytes.inc(
(msg.data.len * seenPeers.len).int64, labelValues = ["validation_duplicate"]
)
@@ -463,6 +518,17 @@ proc validateAndRelay(
# Don't send it to peers that sent it during validation
toSendPeers.excl(seenPeers)
#We have received IMReceiving from these peers, We should not exclude them
#Ideally we should wait (TxTime + large safety cushion) before sending to these peers
var receivingPeers: HashSet[PubSubPeer]
for pr in toSendPeers:
for heIsReceiving in pr.heIsReceivings:
if msgId in heIsReceiving:
receivingPeers.incl(pr)
break
toSendPeers.excl(receivingPeers)
lma_imreceiving_saves.atomicInc(receivingPeers.len.uint32)
proc isMsgInIdontWant(it: PubSubPeer): bool =
for iDontWant in it.iDontWants:
if saltedId in iDontWant:
@@ -470,6 +536,7 @@ proc validateAndRelay(
libp2p_gossipsub_saved_bytes.inc(
msg.data.len.int64, labelValues = ["idontwant"]
)
lma_idontwant_saves.atomicInc()
return true
return false
@@ -477,9 +544,31 @@ proc validateAndRelay(
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
#g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
let sem = newAsyncSemaphore(1)
var staggerPeers = toSeq(toSendPeers)
g.rng.shuffle(staggerPeers)
proc sendToOne(p: PubSubPeer) {.async.} =
g.broadcast(@[p], RPCMsg(control: some(ControlMessage(
preamble: @[ControlIHave(topicID: topic, messageIDs: @[msgId])]
))), isHighPriority = true)
#Won't add much delay as we populate messages in outgoing message queues (no timeouts needed)
#Small delay (nearing avg link latency) is sufficient for IMReceiving messages
await sem.acquire()
defer: sem.release()
if isMsgInIdontWant(p):
return
g.broadcast(@[p], RPCMsg(messages: @[msg]), isHighPriority = false)
for p in staggerPeers:
asyncSpawn sendToOne(p)
if g.knownTopics.contains(topic):
libp2p_pubsub_messages_rebroadcasted.inc(
toSendPeers.len.int64, labelValues = [topic]
@@ -596,11 +685,13 @@ method rpcHandler*(g: GossipSub, peer: PubSubPeer, data: seq[byte]) {.async.} =
g.rewardDelivered(peer, topic, false, delay)
libp2p_gossipsub_duplicate.inc()
lma_duplicate_count.atomicInc()
# onto the next message
continue
libp2p_gossipsub_received.inc()
lma_unique_receives.atomicInc()
# avoid processing messages we are not interested in
if topic notin g.topics:
@@ -782,7 +873,25 @@ method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.asy
g.mcache.put(msgId, msg)
g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)
#g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)
let sem = newAsyncSemaphore(1)
var staggerPeers = toSeq(peers)
g.rng.shuffle(staggerPeers)
#We send message immediately after sending preamble to each peer
proc sendToOne(p: PubSubPeer) {.async.} =
g.broadcast(@[p], RPCMsg(control: some(ControlMessage(
preamble: @[ControlIHave(topicID: topic, messageIDs: @[msgId])]
))), isHighPriority = true)
await sem.acquire()
defer: sem.release()
g.broadcast(@[p], RPCMsg(messages: @[msg]), isHighPriority = false)
for p in staggerPeers:
asyncSpawn sendToOne(p)
if g.knownTopics.contains(topic):
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])

View File

@@ -290,14 +290,31 @@ proc handleIHave*(
for ihave in ihaves:
trace "peer sent ihave", peer, topicID = ihave.topicID, msgs = ihave.messageIDs
if ihave.topicID in g.topics:
#look here for receieved idontwants for the same message
var meshPeers: HashSet[PubSubPeer]
g.mesh.withValue(ihave.topicID, peers): meshPeers.incl(peers[])
g.subscribedDirectPeers.withValue(ihave.topicID, peers): meshPeers.incl(peers[])
for msgId in ihave.messageIDs:
if not g.hasSeen(g.salt(msgId)):
if peer.iHaveBudget <= 0:
break
elif msgId in g.iwantsRequested:
break
elif msgId notin res.messageIDs:
res.messageIDs.add(msgId)
dec peer.iHaveBudget
trace "requested message via ihave", messageID = msgId
#dont send IWANT if we have received (N number of) IDontWant(s) for a msgID
let saltedID = g.salt(msgId)
var numFinds: int = 0
for meshPeer in meshPeers:
for heDontWant in meshPeer.iDontWants:
if saltedID in heDontWant:
numFinds = numFinds + 1
#break;
if numFinds == 0: #We currently wait for 1 IDontWants
res.messageIDs.add(msgId)
dec peer.iHaveBudget
g.iwantsRequested.incl(msgId)
trace "requested message via ihave", messageID = msgId
# shuffling res.messageIDs before sending it out to increase the likelihood
# of getting an answer if the peer truncates the list due to internal size restrictions.
g.rng.shuffle(res.messageIDs)
@@ -309,12 +326,49 @@ proc handleIDontWant*(g: GossipSub, peer: PubSubPeer, iDontWants: seq[ControlIWa
if peer.iDontWants[^1].len > 1000:
break
peer.iDontWants[^1].incl(g.salt(messageId))
proc handlePreamble*(g: GossipSub, peer: PubSubPeer, preambles: seq[ControlIHave]) =
for preamble in preambles:
for messageId in preamble.messageIDs:
#Idealy a peer should a maximum of peer_preamble_announcements preambles for unfinished downloads
#A peer violating this should be pnalized through P4???
if peer.heIsSendings[^1].len > 1000:
break
peer.heIsSendings[^1].incl(messageId)
#Experimental change for quick performance evaluation only (Ideally for very large messages):
#[
1) IDontWant is followed by the message. IMReceiving informs peers that we are receiving this message
2) Prototype implementation for a single topic ("test"). Need topic ID in IDontWant
3) Better solution is to send Message detail in a message preamble, That can be used for IMReceiving
]#
var toSendPeers = HashSet[PubSubPeer]()
g.floodsub.withValue(preamble.topicID, peers): toSendPeers.incl(peers[])
g.mesh.withValue(preamble.topicID, peers): toSendPeers.incl(peers[])
# add direct peers
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(preamble.topicID))
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
imreceiving: @[ControlIWant(messageIDs: @[messageId])]
))), isHighPriority = true)
proc handleIMReceiving*(g: GossipSub,
peer: PubSubPeer,
imreceivings: seq[ControlIWant]) =
for imreceiving in imreceivings:
for messageId in imreceiving.messageIDs:
if peer.heIsReceivings[^1].len > 1000: break
if messageId.len > 100: continue
peer.heIsReceivings[^1].incl(messageId)
proc handleIWant*(
g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]
): seq[Message] =
): tuple[messages: seq[Message], ids: seq[MessageId]] =
var
messages: seq[Message]
#ids: seq[MessageId]
#messages: seq[Message]
invalidRequests = 0
if peer.score < g.parameters.gossipThreshold:
trace "iwant: ignoring low score peer", peer, score = peer.score
@@ -329,14 +383,15 @@ proc handleIWant*(
invalidRequests.inc()
if invalidRequests > 20:
libp2p_gossipsub_received_iwants.inc(1, labelValues = ["skipped"])
return messages
return result
continue
let msg = g.mcache.get(mid).valueOr:
libp2p_gossipsub_received_iwants.inc(1, labelValues = ["unknown"])
continue
libp2p_gossipsub_received_iwants.inc(1, labelValues = ["correct"])
messages.add(msg)
return messages
result.messages.add(msg)
result.ids.add(mid)
return result
proc commitMetrics(metrics: var MeshMetrics) =
libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics)

View File

@@ -188,6 +188,8 @@ type
heartbeatEvents*: seq[AsyncEvent]
iwantsRequested*: HashSet[MessageId]
MeshMetrics* = object # scratch buffers for metrics
otherPeersPerTopicMesh*: int64
otherPeersPerTopicFanout*: int64

View File

@@ -109,6 +109,8 @@ type
## IDONTWANT contains unvalidated message id:s which may be long and/or
## expensive to look up, so we apply the same salting to them as during
## unvalidated message processing
heIsSendings*:Deque[HashSet[MessageId]]
heIsReceivings*:Deque[HashSet[MessageId]]
iHaveBudget*: int
pingBudget*: int
maxMessageSize: int
@@ -557,4 +559,6 @@ proc new*(
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.iDontWants.addFirst(default(HashSet[SaltedId]))
result.heIsSendings.addFirst(default(HashSet[MessageId]) )
result.heIsReceivings.addFirst(default(HashSet[MessageId]))
result.startSendNonPriorityTask()

View File

@@ -63,6 +63,8 @@ type
graft*: seq[ControlGraft]
prune*: seq[ControlPrune]
idontwant*: seq[ControlIWant]
preamble*: seq[ControlIHave]
imreceiving*: seq[ControlIWant]
ControlIHave* = object
topicID*: string
@@ -173,11 +175,13 @@ proc byteSize(controlPrune: ControlPrune): int =
# 8 bytes for uint64
static:
expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant"])
expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant", "preamble", "imreceiving"])
proc byteSize(control: ControlMessage): int =
control.ihave.foldl(a + b.byteSize, 0) + control.iwant.foldl(a + b.byteSize, 0) +
control.graft.foldl(a + b.byteSize, 0) + control.prune.foldl(a + b.byteSize, 0) +
control.idontwant.foldl(a + b.byteSize, 0)
control.idontwant.foldl(a + b.byteSize, 0) +
control.preamble.foldl(a + b.byteSize, 0) +
control.imreceiving.foldl(a + b.byteSize, 0)
static:
expectedFields(RPCMsg, @["subscriptions", "messages", "control", "ping", "pong"])

View File

@@ -89,6 +89,10 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
ipb.write(4, prune)
for idontwant in control.idontwant:
ipb.write(5, idontwant)
for preamble in control.preamble:
ipb.write(6, preamble)
for imreceiving in control.imreceiving:
ipb.write(7, imreceiving)
if len(ipb.buffer) > 0:
ipb.finish()
pb.write(field, ipb)
@@ -208,6 +212,8 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inli
var graftpbs: seq[seq[byte]]
var prunepbs: seq[seq[byte]]
var idontwant: seq[seq[byte]]
var preamble: seq[seq[byte]]
var imreceiving: seq[seq[byte]]
if ?cpb.getRepeatedField(1, ihavepbs):
for item in ihavepbs:
control.ihave.add(?decodeIHave(initProtoBuffer(item)))
@@ -223,6 +229,12 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inli
if ?cpb.getRepeatedField(5, idontwant):
for item in idontwant:
control.idontwant.add(?decodeIWant(initProtoBuffer(item)))
if ? cpb.getRepeatedField(6, preamble):
for item in preamble:
control.preamble.add(?decodeIHave(initProtoBuffer(item)))
if ? cpb.getRepeatedField(7, imreceiving):
for item in imreceiving:
control.imreceiving.add(?decodeIWant(initProtoBuffer(item)))
trace "decodeControl: message statistics",
graft_count = len(control.graft),
prune_count = len(control.prune),