Compare commits

...

1 Commits

Author SHA1 Message Date
Etan Kissling
781fceb654 Dandelion proof-of-concept implementation 2022-09-17 15:50:55 +02:00
4 changed files with 60 additions and 6 deletions

View File

@@ -14,7 +14,7 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import std/[tables, sets, options, sequtils]
import std/[tables, sets, options, random, sequtils]
import chronos, chronicles, metrics
import ./pubsub,
./floodsub,
@@ -295,10 +295,26 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
peer,
RPCMsg(control: some(respControl), messages: messages))
proc sampleDandelionPeers(peers: var HashSet[PubSubPeer]) =
const dandelionD = 2
if peers.len <= dandelionD:
return
trace "Sampling peers", availablePeers = peers.len, dandelionD
var
remainingPeers = peers.toSeq()
randomPeers: HashSet[PubSubPeer]
while randomPeers.len < dandelionD:
let index = random(remainingPeers.len)
randomPeers.incl remainingPeers[index]
remainingPeers.delete index
peers = randomPeers
proc validateAndRelay(g: GossipSub,
msg: Message,
m: Message,
msgId, msgIdSalted: MessageId,
peer: PubSubPeer) {.async.} =
var msg = m
try:
let validation = await g.validate(msg)
@@ -319,6 +335,9 @@ proc validateAndRelay(g: GossipSub,
of ValidationResult.Accept:
discard
if msg.stem.get(0) > 0:
msg.stem = some(msg.stem.get - 1)
# store in cache only after validation
g.mcache.put(msgId, msg)
@@ -337,6 +356,9 @@ proc validateAndRelay(g: GossipSub,
toSendPeers.excl(peer)
toSendPeers.excl(seenPeers)
if msg.stem.get(0) > 0:
toSendPeers.sampleDandelionPeers()
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
@@ -539,13 +561,22 @@ method publish*(g: GossipSub,
libp2p_gossipsub_failed_publish.inc()
return 0
peers.sampleDandelionPeers()
let
msg =
if g.anonymize:
Message.init(none(PeerInfo), data, topic, none(uint64), false)
const
dandelionStemLo = 3.uint32
dandelionStemHi = 6.uint32
let stem = range[dandelionStemLo .. (dandelionStemHi - 1)].rand().uint32
Message.init(
none(PeerInfo), data, topic, none(uint64), false,
stem = some stem)
else:
inc g.msgSeqno
Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign)
Message.init(
some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign)
msgId = g.msgIdProvider(msg).valueOr:
trace "Error generating message id, skipping publish",
error = error

View File

@@ -65,7 +65,8 @@ proc init*(
data: seq[byte],
topic: string,
seqno: Option[uint64],
sign: bool = true): Message
sign: bool = true,
stem = none(uint32)): Message
{.gcsafe, raises: [Defect, LPError].} =
var msg = Message(data: data, topicIDs: @[topic])
@@ -83,6 +84,8 @@ proc init*(
elif sign:
raise (ref LPError)(msg: "Cannot sign message without peer info")
msg.stem = stem
msg
proc init*(

View File

@@ -39,6 +39,7 @@ type
topicIds*: seq[string]
signature*: seq[byte]
key*: seq[byte]
stem*: Option[uint32]
ControlMessage* = object
ihave*: seq[ControlIHave]
@@ -100,6 +101,16 @@ func shortLog*(c: ControlMessage): auto =
prune: mapIt(c.prune, it.shortLog)
)
func shortLog*(s: Option[uint32]): auto =
if s.isSome:
(
stem: $s.get
)
else:
(
stem: "(not set)"
)
func shortLog*(msg: Message): auto =
(
fromPeer: msg.fromPeer.shortLog,
@@ -107,7 +118,8 @@ func shortLog*(msg: Message): auto =
seqno: msg.seqno.shortLog,
topicIds: $msg.topicIds,
signature: msg.signature.shortLog,
key: msg.key.shortLog
key: msg.key.shortLog,
stem: msg.stem.shortLog,
)
func shortLog*(m: RPCMsg): auto =

View File

@@ -117,6 +117,8 @@ proc encodeMessage*(msg: Message, anonymize: bool): seq[byte] =
pb.write(5, msg.signature)
if len(msg.key) > 0 and not anonymize:
pb.write(6, msg.key)
if msg.stem.isSome:
pb.write(7, msg.stem.get)
pb.finish()
when defined(libp2p_protobuf_metrics):
@@ -295,6 +297,12 @@ proc decodeMessage*(pb: ProtoBuffer): ProtoResult[Message] {.inline.} =
trace "decodeMessage: read public key", key = msg.key.shortLog()
else:
trace "decodeMessage: public key is missing"
var stem: uint32
if ? pb.getField(7, stem):
msg.stem = some stem
trace "decodeMessage: read stem", stem = msg.stem.shortLog()
else:
trace "decodeMessage: stem is missing"
ok(msg)
proc decodeMessages*(pb: ProtoBuffer): ProtoResult[seq[Message]] {.inline.} =