mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 13:58:17 -05:00
Compare commits
1 Commits
v1.9.0
...
dev/etan/e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
781fceb654 |
@@ -14,7 +14,7 @@ when (NimMajor, NimMinor) < (1, 4):
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[tables, sets, options, sequtils]
|
||||
import std/[tables, sets, options, random, sequtils]
|
||||
import chronos, chronicles, metrics
|
||||
import ./pubsub,
|
||||
./floodsub,
|
||||
@@ -295,10 +295,26 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||
peer,
|
||||
RPCMsg(control: some(respControl), messages: messages))
|
||||
|
||||
proc sampleDandelionPeers(peers: var HashSet[PubSubPeer]) =
|
||||
const dandelionD = 2
|
||||
if peers.len <= dandelionD:
|
||||
return
|
||||
|
||||
trace "Sampling peers", availablePeers = peers.len, dandelionD
|
||||
var
|
||||
remainingPeers = peers.toSeq()
|
||||
randomPeers: HashSet[PubSubPeer]
|
||||
while randomPeers.len < dandelionD:
|
||||
let index = random(remainingPeers.len)
|
||||
randomPeers.incl remainingPeers[index]
|
||||
remainingPeers.delete index
|
||||
peers = randomPeers
|
||||
|
||||
proc validateAndRelay(g: GossipSub,
|
||||
msg: Message,
|
||||
m: Message,
|
||||
msgId, msgIdSalted: MessageId,
|
||||
peer: PubSubPeer) {.async.} =
|
||||
var msg = m
|
||||
try:
|
||||
let validation = await g.validate(msg)
|
||||
|
||||
@@ -319,6 +335,9 @@ proc validateAndRelay(g: GossipSub,
|
||||
of ValidationResult.Accept:
|
||||
discard
|
||||
|
||||
if msg.stem.get(0) > 0:
|
||||
msg.stem = some(msg.stem.get - 1)
|
||||
|
||||
# store in cache only after validation
|
||||
g.mcache.put(msgId, msg)
|
||||
|
||||
@@ -337,6 +356,9 @@ proc validateAndRelay(g: GossipSub,
|
||||
toSendPeers.excl(peer)
|
||||
toSendPeers.excl(seenPeers)
|
||||
|
||||
if msg.stem.get(0) > 0:
|
||||
toSendPeers.sampleDandelionPeers()
|
||||
|
||||
# In theory, if topics are the same in all messages, we could batch - we'd
|
||||
# also have to be careful to only include validated messages
|
||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
|
||||
@@ -539,13 +561,22 @@ method publish*(g: GossipSub,
|
||||
libp2p_gossipsub_failed_publish.inc()
|
||||
return 0
|
||||
|
||||
peers.sampleDandelionPeers()
|
||||
|
||||
let
|
||||
msg =
|
||||
if g.anonymize:
|
||||
Message.init(none(PeerInfo), data, topic, none(uint64), false)
|
||||
const
|
||||
dandelionStemLo = 3.uint32
|
||||
dandelionStemHi = 6.uint32
|
||||
let stem = range[dandelionStemLo .. (dandelionStemHi - 1)].rand().uint32
|
||||
Message.init(
|
||||
none(PeerInfo), data, topic, none(uint64), false,
|
||||
stem = some stem)
|
||||
else:
|
||||
inc g.msgSeqno
|
||||
Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign)
|
||||
Message.init(
|
||||
some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign)
|
||||
msgId = g.msgIdProvider(msg).valueOr:
|
||||
trace "Error generating message id, skipping publish",
|
||||
error = error
|
||||
|
||||
@@ -65,7 +65,8 @@ proc init*(
|
||||
data: seq[byte],
|
||||
topic: string,
|
||||
seqno: Option[uint64],
|
||||
sign: bool = true): Message
|
||||
sign: bool = true,
|
||||
stem = none(uint32)): Message
|
||||
{.gcsafe, raises: [Defect, LPError].} =
|
||||
var msg = Message(data: data, topicIDs: @[topic])
|
||||
|
||||
@@ -83,6 +84,8 @@ proc init*(
|
||||
elif sign:
|
||||
raise (ref LPError)(msg: "Cannot sign message without peer info")
|
||||
|
||||
msg.stem = stem
|
||||
|
||||
msg
|
||||
|
||||
proc init*(
|
||||
|
||||
@@ -39,6 +39,7 @@ type
|
||||
topicIds*: seq[string]
|
||||
signature*: seq[byte]
|
||||
key*: seq[byte]
|
||||
stem*: Option[uint32]
|
||||
|
||||
ControlMessage* = object
|
||||
ihave*: seq[ControlIHave]
|
||||
@@ -100,6 +101,16 @@ func shortLog*(c: ControlMessage): auto =
|
||||
prune: mapIt(c.prune, it.shortLog)
|
||||
)
|
||||
|
||||
func shortLog*(s: Option[uint32]): auto =
|
||||
if s.isSome:
|
||||
(
|
||||
stem: $s.get
|
||||
)
|
||||
else:
|
||||
(
|
||||
stem: "(not set)"
|
||||
)
|
||||
|
||||
func shortLog*(msg: Message): auto =
|
||||
(
|
||||
fromPeer: msg.fromPeer.shortLog,
|
||||
@@ -107,7 +118,8 @@ func shortLog*(msg: Message): auto =
|
||||
seqno: msg.seqno.shortLog,
|
||||
topicIds: $msg.topicIds,
|
||||
signature: msg.signature.shortLog,
|
||||
key: msg.key.shortLog
|
||||
key: msg.key.shortLog,
|
||||
stem: msg.stem.shortLog,
|
||||
)
|
||||
|
||||
func shortLog*(m: RPCMsg): auto =
|
||||
|
||||
@@ -117,6 +117,8 @@ proc encodeMessage*(msg: Message, anonymize: bool): seq[byte] =
|
||||
pb.write(5, msg.signature)
|
||||
if len(msg.key) > 0 and not anonymize:
|
||||
pb.write(6, msg.key)
|
||||
if msg.stem.isSome:
|
||||
pb.write(7, msg.stem.get)
|
||||
pb.finish()
|
||||
|
||||
when defined(libp2p_protobuf_metrics):
|
||||
@@ -295,6 +297,12 @@ proc decodeMessage*(pb: ProtoBuffer): ProtoResult[Message] {.inline.} =
|
||||
trace "decodeMessage: read public key", key = msg.key.shortLog()
|
||||
else:
|
||||
trace "decodeMessage: public key is missing"
|
||||
var stem: uint32
|
||||
if ? pb.getField(7, stem):
|
||||
msg.stem = some stem
|
||||
trace "decodeMessage: read stem", stem = msg.stem.shortLog()
|
||||
else:
|
||||
trace "decodeMessage: stem is missing"
|
||||
ok(msg)
|
||||
|
||||
proc decodeMessages*(pb: ProtoBuffer): ProtoResult[seq[Message]] {.inline.} =
|
||||
|
||||
Reference in New Issue
Block a user