mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 12:28:18 -05:00
Compare commits
8 Commits
research_g
...
stg
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
05446734e3 | ||
|
|
64ef502c87 | ||
|
|
b2a75fc25e | ||
|
|
4b691b6374 | ||
|
|
8bb6215d8a | ||
|
|
f1b78f6be6 | ||
|
|
8377eb0362 | ||
|
|
35d1876ad8 |
@@ -36,6 +36,32 @@ import ./gossipsub/[types, scoring, behavior], ../../utils/heartbeat
|
||||
|
||||
export types, scoring, behavior, pubsub
|
||||
|
||||
import std/atomics
|
||||
const WARMUP_THRESHOLD = 2
|
||||
var
|
||||
lma_dup_during_validation: Atomic[uint32] # number of duplicates during 1st message validation
|
||||
lma_idontwant_saves: Atomic[uint32] # number of Txs saved due to idontwant
|
||||
lma_duplicate_count: Atomic[uint32] # number of duplicate messages received
|
||||
lma_iwants_sent: Atomic[uint32] # number of iwant requests sent
|
||||
lma_iwants_replied: Atomic[uint32] # number of iwant messages that are replied
|
||||
lma_imreceiving_saves: Atomic[uint32] # number of messages saved due to imreceiving message
|
||||
lma_unique_receives: Atomic[uint32] # number of unique messages received
|
||||
lma_mesh_recvs_aftar_iwant: Atomic[uint32] # messages received from mesh, after sending iwant request
|
||||
lma_warmup_messages: Atomic[uint32] # dont issue idontwant during if < WARMUP_THRESHOLD
|
||||
|
||||
lma_dup_during_validation.store(0)
|
||||
lma_idontwant_saves.store(0)
|
||||
lma_duplicate_count.store(0)
|
||||
lma_iwants_sent.store(0)
|
||||
lma_iwants_replied.store(0)
|
||||
lma_imreceiving_saves.store(0)
|
||||
lma_unique_receives.store(0)
|
||||
lma_mesh_recvs_aftar_iwant.store(0)
|
||||
lma_warmup_messages.store(0)
|
||||
|
||||
export lma_dup_during_validation, lma_idontwant_saves, lma_duplicate_count, lma_iwants_sent,
|
||||
lma_iwants_replied, lma_imreceiving_saves, lma_unique_receives, lma_mesh_recvs_aftar_iwant
|
||||
|
||||
logScope:
|
||||
topics = "libp2p gossipsub"
|
||||
|
||||
@@ -226,6 +252,7 @@ method init*(g: GossipSub) =
|
||||
g.codecs &= GossipSubCodec_12
|
||||
g.codecs &= GossipSubCodec_11
|
||||
g.codecs &= GossipSubCodec_10
|
||||
g.iwantsRequested = initHashSet[MessageId]()
|
||||
|
||||
method onNewPeer*(g: GossipSub, peer: PubSubPeer) =
|
||||
g.withPeerStats(peer.peerId) do(stats: var PeerStats):
|
||||
@@ -347,6 +374,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||
|
||||
var respControl: ControlMessage
|
||||
g.handleIDontWant(peer, control.idontwant)
|
||||
g.handleIMReceiving(peer, control.imreceiving)
|
||||
let iwant = g.handleIHave(peer, control.ihave)
|
||||
if iwant.messageIDs.len > 0:
|
||||
respControl.iwant.add(iwant)
|
||||
@@ -360,6 +388,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||
if isPruneNotEmpty or isIWantNotEmpty:
|
||||
if isIWantNotEmpty:
|
||||
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
|
||||
lma_iwants_sent.atomicInc(respControl.iwant.len.uint32)
|
||||
|
||||
if isPruneNotEmpty:
|
||||
for prune in respControl.prune:
|
||||
@@ -381,6 +410,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||
|
||||
# iwant replies have lower priority
|
||||
trace "sending iwant reply messages", peer
|
||||
lma_iwants_replied.atomicInc(messages.len.uint32)
|
||||
g.send(peer, RPCMsg(messages: messages), isHighPriority = false)
|
||||
|
||||
proc validateAndRelay(
|
||||
@@ -397,6 +427,8 @@ proc validateAndRelay(
|
||||
toSendPeers.incl(peers[])
|
||||
g.subscribedDirectPeers.withValue(topic, peers):
|
||||
toSendPeers.incl(peers[])
|
||||
if not (peer in toSendPeers):
|
||||
lma_mesh_recvs_aftar_iwant.atomicInc()
|
||||
toSendPeers.excl(peer)
|
||||
|
||||
if msg.data.len > max(512, msgId.len * 10):
|
||||
@@ -409,25 +441,39 @@ proc validateAndRelay(
|
||||
# descored) and that the savings from honest peers are greater than the
|
||||
# cost a dishonest peer can incur in short time (since the IDONTWANT is
|
||||
# small).
|
||||
var peersToSendIDontWant = HashSet[PubSubPeer]()
|
||||
addToSendPeers(peersToSendIDontWant)
|
||||
peersToSendIDontWant.exclIfIt(
|
||||
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
|
||||
)
|
||||
g.broadcast(
|
||||
peersToSendIDontWant,
|
||||
RPCMsg(
|
||||
control:
|
||||
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
|
||||
),
|
||||
isHighPriority = true,
|
||||
)
|
||||
if lma_warmup_messages.load() < WARMUP_THRESHOLD:
|
||||
lma_warmup_messages.atomicInc()
|
||||
if lma_warmup_messages.load() == WARMUP_THRESHOLD:
|
||||
lma_dup_during_validation.store(0)
|
||||
lma_idontwant_saves.store(0)
|
||||
lma_duplicate_count.store(0)
|
||||
lma_iwants_sent.store(0)
|
||||
lma_iwants_replied.store(0)
|
||||
lma_imreceiving_saves.store(0)
|
||||
lma_unique_receives.store(0)
|
||||
lma_mesh_recvs_aftar_iwant.store(0)
|
||||
|
||||
else:
|
||||
var peersToSendIDontWant = HashSet[PubSubPeer]()
|
||||
addToSendPeers(peersToSendIDontWant)
|
||||
peersToSendIDontWant.exclIfIt(
|
||||
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
|
||||
)
|
||||
g.broadcast(
|
||||
peersToSendIDontWant,
|
||||
RPCMsg(
|
||||
control:
|
||||
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
|
||||
),
|
||||
isHighPriority = true,
|
||||
)
|
||||
|
||||
let validation = await g.validate(msg)
|
||||
|
||||
var seenPeers: HashSet[PubSubPeer]
|
||||
discard g.validationSeen.pop(saltedId, seenPeers)
|
||||
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
|
||||
lma_dup_during_validation.atomicInc(seenPeers.len.uint32)
|
||||
libp2p_gossipsub_saved_bytes.inc(
|
||||
(msg.data.len * seenPeers.len).int64, labelValues = ["validation_duplicate"]
|
||||
)
|
||||
@@ -463,6 +509,17 @@ proc validateAndRelay(
|
||||
# Don't send it to peers that sent it during validation
|
||||
toSendPeers.excl(seenPeers)
|
||||
|
||||
#We have received IMReceiving from these peers, We should not exclude them
|
||||
#Ideally we should wait (TxTime + large safety cushion) before sending to these peers
|
||||
var receivingPeers: HashSet[PubSubPeer]
|
||||
for pr in toSendPeers:
|
||||
for heIsReceiving in pr.heIsReceivings:
|
||||
if msgId in heIsReceiving:
|
||||
receivingPeers.incl(pr)
|
||||
break
|
||||
toSendPeers.excl(receivingPeers)
|
||||
lma_imreceiving_saves.atomicInc(receivingPeers.len.uint32)
|
||||
|
||||
proc isMsgInIdontWant(it: PubSubPeer): bool =
|
||||
for iDontWant in it.iDontWants:
|
||||
if saltedId in iDontWant:
|
||||
@@ -470,6 +527,7 @@ proc validateAndRelay(
|
||||
libp2p_gossipsub_saved_bytes.inc(
|
||||
msg.data.len.int64, labelValues = ["idontwant"]
|
||||
)
|
||||
lma_idontwant_saves.atomicInc()
|
||||
return true
|
||||
return false
|
||||
|
||||
@@ -596,11 +654,13 @@ method rpcHandler*(g: GossipSub, peer: PubSubPeer, data: seq[byte]) {.async.} =
|
||||
g.rewardDelivered(peer, topic, false, delay)
|
||||
|
||||
libp2p_gossipsub_duplicate.inc()
|
||||
lma_duplicate_count.atomicInc()
|
||||
|
||||
# onto the next message
|
||||
continue
|
||||
|
||||
libp2p_gossipsub_received.inc()
|
||||
lma_unique_receives.atomicInc()
|
||||
|
||||
# avoid processing messages we are not interested in
|
||||
if topic notin g.topics:
|
||||
|
||||
@@ -290,14 +290,31 @@ proc handleIHave*(
|
||||
for ihave in ihaves:
|
||||
trace "peer sent ihave", peer, topicID = ihave.topicID, msgs = ihave.messageIDs
|
||||
if ihave.topicID in g.topics:
|
||||
#look here for receieved idontwants for the same message
|
||||
var meshPeers: HashSet[PubSubPeer]
|
||||
g.mesh.withValue(ihave.topicID, peers): meshPeers.incl(peers[])
|
||||
g.subscribedDirectPeers.withValue(ihave.topicID, peers): meshPeers.incl(peers[])
|
||||
|
||||
for msgId in ihave.messageIDs:
|
||||
if not g.hasSeen(g.salt(msgId)):
|
||||
if peer.iHaveBudget <= 0:
|
||||
break
|
||||
elif msgId in g.iwantsRequested:
|
||||
break
|
||||
elif msgId notin res.messageIDs:
|
||||
res.messageIDs.add(msgId)
|
||||
dec peer.iHaveBudget
|
||||
trace "requested message via ihave", messageID = msgId
|
||||
#dont send IWANT if we have received (N number of) IDontWant(s) for a msgID
|
||||
let saltedID = g.salt(msgId)
|
||||
var numFinds: int = 0
|
||||
for meshPeer in meshPeers:
|
||||
for heDontWant in meshPeer.iDontWants:
|
||||
if saltedID in heDontWant:
|
||||
numFinds = numFinds + 1
|
||||
#break;
|
||||
if numFinds == 0: #We currently wait for 1 IDontWants
|
||||
res.messageIDs.add(msgId)
|
||||
dec peer.iHaveBudget
|
||||
g.iwantsRequested.incl(msgId)
|
||||
trace "requested message via ihave", messageID = msgId
|
||||
# shuffling res.messageIDs before sending it out to increase the likelihood
|
||||
# of getting an answer if the peer truncates the list due to internal size restrictions.
|
||||
g.rng.shuffle(res.messageIDs)
|
||||
@@ -309,6 +326,35 @@ proc handleIDontWant*(g: GossipSub, peer: PubSubPeer, iDontWants: seq[ControlIWa
|
||||
if peer.iDontWants[^1].len > 1000:
|
||||
break
|
||||
peer.iDontWants[^1].incl(g.salt(messageId))
|
||||
|
||||
#Experimental change for quick performance evaluation only (Ideally for very large messages):
|
||||
#[
|
||||
1) IDontWant is followed by the message. IMReceiving informs peers that we are receiving this message
|
||||
2) Prototype implementation for a single topic ("test"). Need topic ID in IDontWant
|
||||
|
||||
3) Better solution is to send Message detail in a message preamble, That can be used for IMReceiving
|
||||
]#
|
||||
var toSendPeers = HashSet[PubSubPeer]()
|
||||
g.floodsub.withValue("test", peers): toSendPeers.incl(peers[])
|
||||
g.mesh.withValue("test", peers): toSendPeers.incl(peers[])
|
||||
|
||||
# add direct peers
|
||||
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault("test"))
|
||||
|
||||
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
|
||||
imreceiving: @[ControlIWant(messageIDs: @[messageId])]
|
||||
))), isHighPriority = true)
|
||||
|
||||
|
||||
|
||||
proc handleIMReceiving*(g: GossipSub,
|
||||
peer: PubSubPeer,
|
||||
imreceivings: seq[ControlIWant]) =
|
||||
for imreceiving in imreceivings:
|
||||
for messageId in imreceiving.messageIDs:
|
||||
if peer.heIsReceivings[^1].len > 1000: break
|
||||
if messageId.len > 100: continue
|
||||
peer.heIsReceivings[^1].incl(messageId)
|
||||
|
||||
proc handleIWant*(
|
||||
g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]
|
||||
|
||||
@@ -188,6 +188,8 @@ type
|
||||
|
||||
heartbeatEvents*: seq[AsyncEvent]
|
||||
|
||||
iwantsRequested*: HashSet[MessageId]
|
||||
|
||||
MeshMetrics* = object # scratch buffers for metrics
|
||||
otherPeersPerTopicMesh*: int64
|
||||
otherPeersPerTopicFanout*: int64
|
||||
|
||||
@@ -109,6 +109,7 @@ type
|
||||
## IDONTWANT contains unvalidated message id:s which may be long and/or
|
||||
## expensive to look up, so we apply the same salting to them as during
|
||||
## unvalidated message processing
|
||||
heIsReceivings*:Deque[HashSet[MessageId]]
|
||||
iHaveBudget*: int
|
||||
pingBudget*: int
|
||||
maxMessageSize: int
|
||||
@@ -557,4 +558,5 @@ proc new*(
|
||||
)
|
||||
result.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||
result.iDontWants.addFirst(default(HashSet[SaltedId]))
|
||||
result.heIsReceivings.addFirst(default(HashSet[MessageId]))
|
||||
result.startSendNonPriorityTask()
|
||||
|
||||
@@ -63,6 +63,7 @@ type
|
||||
graft*: seq[ControlGraft]
|
||||
prune*: seq[ControlPrune]
|
||||
idontwant*: seq[ControlIWant]
|
||||
imreceiving*: seq[ControlIWant]
|
||||
|
||||
ControlIHave* = object
|
||||
topicID*: string
|
||||
@@ -173,11 +174,12 @@ proc byteSize(controlPrune: ControlPrune): int =
|
||||
# 8 bytes for uint64
|
||||
|
||||
static:
|
||||
expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant"])
|
||||
expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant", "imreceiving"])
|
||||
proc byteSize(control: ControlMessage): int =
|
||||
control.ihave.foldl(a + b.byteSize, 0) + control.iwant.foldl(a + b.byteSize, 0) +
|
||||
control.graft.foldl(a + b.byteSize, 0) + control.prune.foldl(a + b.byteSize, 0) +
|
||||
control.idontwant.foldl(a + b.byteSize, 0)
|
||||
control.idontwant.foldl(a + b.byteSize, 0) +
|
||||
control.imreceiving.foldl(a + b.byteSize, 0)
|
||||
|
||||
static:
|
||||
expectedFields(RPCMsg, @["subscriptions", "messages", "control", "ping", "pong"])
|
||||
|
||||
@@ -89,6 +89,8 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
|
||||
ipb.write(4, prune)
|
||||
for idontwant in control.idontwant:
|
||||
ipb.write(5, idontwant)
|
||||
for imreceiving in control.imreceiving:
|
||||
ipb.write(6, imreceiving)
|
||||
if len(ipb.buffer) > 0:
|
||||
ipb.finish()
|
||||
pb.write(field, ipb)
|
||||
@@ -208,6 +210,7 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inli
|
||||
var graftpbs: seq[seq[byte]]
|
||||
var prunepbs: seq[seq[byte]]
|
||||
var idontwant: seq[seq[byte]]
|
||||
var imreceiving: seq[seq[byte]]
|
||||
if ?cpb.getRepeatedField(1, ihavepbs):
|
||||
for item in ihavepbs:
|
||||
control.ihave.add(?decodeIHave(initProtoBuffer(item)))
|
||||
@@ -223,6 +226,9 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inli
|
||||
if ?cpb.getRepeatedField(5, idontwant):
|
||||
for item in idontwant:
|
||||
control.idontwant.add(?decodeIWant(initProtoBuffer(item)))
|
||||
if ? cpb.getRepeatedField(6, imreceiving):
|
||||
for item in imreceiving:
|
||||
control.imreceiving.add(?decodeIWant(initProtoBuffer(item)))
|
||||
trace "decodeControl: message statistics",
|
||||
graft_count = len(control.graft),
|
||||
prune_count = len(control.prune),
|
||||
|
||||
Reference in New Issue
Block a user