feat(gossipsub1.4): adding new attributes and protobuffers (#1515)

This commit is contained in:
richΛrd
2025-07-12 12:12:06 -04:00
committed by GitHub
parent 998bb58aef
commit e83bd2d582
6 changed files with 206 additions and 12 deletions

View File

@@ -56,7 +56,7 @@ task testinterop, "Runs interop tests":
runTest("testinterop")
task testpubsub, "Runs pubsub tests":
runTest("pubsub/testpubsub")
runTest("pubsub/testpubsub", "-d:libp2p_gossipsub_1_4")
task testfilter, "Run PKI filter test":
runTest("testpkifilter")

View File

@@ -0,0 +1,37 @@
import chronos
import std/atomics
const DefaultAlpha = 0.3
const InitialRate = 2_500_000 #bytes per second
type
ExponentialMovingAverage* = ref object
alpha: float
value: Atomic[float64]
BandwidthTracking* = ref object
download*: ExponentialMovingAverage
proc init*(T: type[ExponentialMovingAverage], alpha: float = DefaultAlpha): T =
let e = ExponentialMovingAverage(alpha: alpha)
e.value.store(InitialRate)
return e
proc init*(T: type[BandwidthTracking], alpha: float = DefaultAlpha): T =
BandwidthTracking(download: ExponentialMovingAverage())
proc update*(e: var ExponentialMovingAverage, startAt: Moment, bytes: int) =
let elapsedTime = Moment.now() - startAt
let curSample = float(bytes * 1000) / elapsedTime.milliseconds.float
let oldSample = e.value.load()
let ema = e.alpha * curSample + (1.0 - e.alpha) * oldSample
e.value.store(ema)
proc value*(e: var ExponentialMovingAverage): float =
e.value.load()
proc calculateReceiveTimeMs*(msgLen: int64, dataRate: int64 = InitialRate): int64 =
let txTime = ((msgLen * 1000) div dataRate)
#ideally (RTT * 2) + 5% TxTime ? Need many testruns to precisely adjust safety margin
let margin = 250 + (txTime.float64 * 0.05)
result = txTime + margin.int64

View File

@@ -18,6 +18,7 @@ import "../../.."/[peerid, multiaddress, utility]
export options, tables, sets
const
GossipSubCodec_14* = "/meshsub/1.4.0"
GossipSubCodec_12* = "/meshsub/1.2.0"
GossipSubCodec_11* = "/meshsub/1.1.0"
GossipSubCodec_10* = "/meshsub/1.0.0"
@@ -46,6 +47,8 @@ const
BackoffSlackTime* = 2 # seconds
PingsPeerBudget* = 100 # maximum of 6.4kb/heartbeat (6.4kb/s with default 1 second/hb)
IHavePeerBudget* = 10
PreamblePeerBudget* = 10
PullOperation* = true
# the max amount of IHave to expose, not by spec, but go as example
# rust sigp: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/config.rs#L572
# go: https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L155
@@ -200,6 +203,9 @@ type
mcache*: MCache # messages cache
validationSeen*: ValidationSeenTable # peers who sent us message in validation
heartbeatFut*: Future[void] # cancellation future for heartbeat interval
when defined(libp2p_gossipsub_1_4):
preambleExpirationFut*: Future[void]
# cancellation future for preamble expiration heartbeat interval
scoringHeartbeatFut*: Future[void]
# cancellation future for scoring heartbeat interval
heartbeatRunning*: bool
@@ -213,6 +219,11 @@ type
heartbeatEvents*: seq[AsyncEvent]
when defined(libp2p_gossipsub_1_4):
ongoingReceives*: OngoingReceivesStore # list of messages we are receiving
ongoingIWantReceives*: OngoingReceivesStore
# list of iwant replies we are receiving
MeshMetrics* = object # scratch buffers for metrics
otherPeersPerTopicMesh*: int64
otherPeersPerTopicFanout*: int64

View File

@@ -122,6 +122,9 @@ type
handler*: RPCHandler
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
when defined(libp2p_gossipsub_1_4):
bandwidthTracking*: BandwidthTracking
score*: float64
sentIHaves*: Deque[HashSet[MessageId]]
iDontWants*: Deque[HashSet[SaltedId]]
@@ -135,6 +138,11 @@ type
behaviourPenalty*: float64 # the eventual penalty score
overheadRateLimitOpt*: Opt[TokenBucket]
when defined(libp2p_gossipsub_1_4):
preambleBudget*: int
heIsReceivings*: Table[MessageId, uint32]
heIsSendings*: Table[MessageId, Moment]
rpcmessagequeue: RpcMessageQueue
maxNumElementsInNonPriorityQueue*: int
# The max number of elements allowed in the non-priority queue.
@@ -610,6 +618,11 @@ proc new*(
maxNumElementsInNonPriorityQueue: maxNumElementsInNonPriorityQueue,
customConnCallbacks: customConnCallbacks,
)
when defined(libp2p_gossipsub_1_4):
result.bandwidthTracking =
BandwidthTracking(download: ExponentialMovingAverage.init())
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.iDontWants.addFirst(default(HashSet[SaltedId]))
result.startSendNonPriorityTask()

View File

@@ -63,6 +63,9 @@ type
graft*: seq[ControlGraft]
prune*: seq[ControlPrune]
idontwant*: seq[ControlIWant]
when defined(libp2p_gossipsub_1_4):
preamble*: seq[ControlPreamble]
imreceiving*: seq[ControlIMReceiving]
ControlIHave* = object
topicID*: string
@@ -84,6 +87,10 @@ type
messageID*: MessageId
messageLength*: uint32
ControlIMReceiving* = object
messageID*: MessageId
messageLength*: uint32
RPCMsg* = object
subscriptions*: seq[SubOpts]
messages*: seq[Message]
@@ -106,13 +113,29 @@ func shortLog*(s: ControlGraft): auto =
func shortLog*(s: ControlPrune): auto =
(topic: s.topicID.shortLog)
func shortLog*(s: ControlPreamble): auto =
(topic: s.topicID.shortLog, messageID: s.messageID.shortLog)
func shortLog*(s: ControlIMReceiving): auto =
(messageID: s.messageID.shortLog)
func shortLog*(c: ControlMessage): auto =
(
ihave: mapIt(c.ihave, it.shortLog),
iwant: mapIt(c.iwant, it.shortLog),
graft: mapIt(c.graft, it.shortLog),
prune: mapIt(c.prune, it.shortLog),
)
when defined(libp2p_gossipsub_1_4):
(
ihave: mapIt(c.ihave, it.shortLog),
iwant: mapIt(c.iwant, it.shortLog),
graft: mapIt(c.graft, it.shortLog),
prune: mapIt(c.prune, it.shortLog),
preamble: mapIt(c.preamble, it.shortLog),
imreceiving: mapIt(c.imreceiving, it.shortLog),
)
else:
(
ihave: mapIt(c.ihave, it.shortLog),
iwant: mapIt(c.iwant, it.shortLog),
graft: mapIt(c.graft, it.shortLog),
prune: mapIt(c.prune, it.shortLog),
)
func shortLog*(msg: Message): auto =
(
@@ -178,11 +201,41 @@ proc byteSize(controlPrune: ControlPrune): int =
# 8 bytes for uint64
static:
expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant"])
proc byteSize(control: ControlMessage): int =
control.ihave.foldl(a + b.byteSize, 0) + control.iwant.foldl(a + b.byteSize, 0) +
control.graft.foldl(a + b.byteSize, 0) + control.prune.foldl(a + b.byteSize, 0) +
control.idontwant.foldl(a + b.byteSize, 0)
expectedFields(ControlPreamble, @["topicID", "messageID", "messageLength"])
proc byteSize(controlPreamble: ControlPreamble): int =
controlPreamble.topicID.len + controlPreamble.messageID.len + 4 # 4 bytes for uint32
proc byteSize*(preambles: seq[ControlPreamble]): int =
preambles.foldl(a + b.byteSize, 0)
static:
expectedFields(ControlIMReceiving, @["messageID", "messageLength"])
proc byteSize(controlIMreceiving: ControlIMReceiving): int =
controlIMreceiving.messageID.len + 4 # 4 bytes for uint32
proc byteSize*(imreceivings: seq[ControlIMReceiving]): int =
imreceivings.foldl(a + b.byteSize, 0)
when defined(libp2p_gossipsub_1_4):
static:
expectedFields(
ControlMessage,
@["ihave", "iwant", "graft", "prune", "idontwant", "preamble", "imreceiving"],
)
proc byteSize(control: ControlMessage): int =
control.ihave.foldl(a + b.byteSize, 0) + control.iwant.foldl(a + b.byteSize, 0) +
control.graft.foldl(a + b.byteSize, 0) + control.prune.foldl(a + b.byteSize, 0) +
control.idontwant.foldl(a + b.byteSize, 0) +
control.preamble.foldl(a + b.byteSize, 0) +
control.imreceiving.foldl(a + b.byteSize, 0)
else:
static:
expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant"])
proc byteSize(control: ControlMessage): int =
control.ihave.foldl(a + b.byteSize, 0) + control.iwant.foldl(a + b.byteSize, 0) +
control.graft.foldl(a + b.byteSize, 0) + control.prune.foldl(a + b.byteSize, 0) +
control.idontwant.foldl(a + b.byteSize, 0)
static:
expectedFields(RPCMsg, @["subscriptions", "messages", "control", "ping", "pong"])

View File

@@ -77,6 +77,31 @@ proc write*(pb: var ProtoBuffer, field: int, iwant: ControlIWant) =
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["iwant"])
proc write*(pb: var ProtoBuffer, field: int, preamble: ControlPreamble) =
var ipb = initProtoBuffer()
ipb.write(1, preamble.topicID)
ipb.write(2, preamble.messageID)
ipb.write(3, preamble.messageLength)
if len(ipb.buffer) > 0:
ipb.finish()
pb.write(field, ipb)
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["preamble"])
proc write*(pb: var ProtoBuffer, field: int, imreceiving: ControlIMReceiving) =
var ipb = initProtoBuffer()
ipb.write(1, imreceiving.messageID)
ipb.write(2, imreceiving.messageLength)
if ipb.buffer.len > 0:
ipb.finish()
pb.write(field, ipb)
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_write.inc(ipb.getLen().int64, labelValues = ["imreceiving"])
proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
var ipb = initProtoBuffer()
for ihave in control.ihave:
@@ -89,6 +114,11 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
ipb.write(4, prune)
for idontwant in control.idontwant:
ipb.write(5, idontwant)
when defined(libp2p_gossipsub_1_4):
for preamble in control.preamble:
ipb.write(6, preamble)
for imreceiving in control.imreceiving:
ipb.write(7, imreceiving)
if len(ipb.buffer) > 0:
ipb.finish()
pb.write(field, ipb)
@@ -197,6 +227,43 @@ proc decodeIWant*(pb: ProtoBuffer): ProtoResult[ControlIWant] {.inline.} =
trace "decodeIWant: no messageIDs"
ok(control)
proc decodePreamble*(pb: ProtoBuffer): ProtoResult[ControlPreamble] {.inline.} =
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["preamble"])
trace "decodePreamble: decoding message"
var control = ControlPreamble()
if ?pb.getField(1, control.topicID):
trace "decodePreamble: read topicID", topic = control.topicID
else:
trace "decodePreamble: topicID is missing"
if ?pb.getField(2, control.messageID):
trace "decodePreamble: read messageID", message_id = control.messageID
else:
trace "decodePreamble: messageID is missing"
if ?pb.getField(3, control.messageLength):
trace "decodePreamble: read message Length", message_length = control.messageLength
else:
trace "decodePreamble: message Length is missing"
ok(control)
proc decodeIMReceiving*(pb: ProtoBuffer): ProtoResult[ControlIMReceiving] {.inline.} =
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_read.inc(pb.getLen().int64, labelValues = ["imreceiving"])
trace "decodeIMReceiving: decoding message"
var control = ControlIMReceiving()
if ?pb.getField(1, control.messageID):
trace "decodeIMReceiving: read messageID", message_id = control.messageID
else:
trace "decodeIMReceiving: messageID is missing"
if ?pb.getField(2, control.messageLength):
trace "decodeIMReceiving: read message Length",
message_length = control.messageLength
else:
trace "decodeIMReceiving: message Length is missing"
ok(control)
proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inline.} =
trace "decodeControl: decoding message"
var buffer: seq[byte]
@@ -208,6 +275,10 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inli
var graftpbs: seq[seq[byte]]
var prunepbs: seq[seq[byte]]
var idontwant: seq[seq[byte]]
when defined(libp2p_gossipsub_1_4):
var preamble: seq[seq[byte]]
var imreceiving: seq[seq[byte]]
if ?cpb.getRepeatedField(1, ihavepbs):
for item in ihavepbs:
control.ihave.add(?decodeIHave(initProtoBuffer(item)))
@@ -223,6 +294,15 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.inli
if ?cpb.getRepeatedField(5, idontwant):
for item in idontwant:
control.idontwant.add(?decodeIWant(initProtoBuffer(item)))
when defined(libp2p_gossipsub_1_4):
if ?cpb.getRepeatedField(6, preamble):
for item in preamble:
control.preamble.add(?decodePreamble(initProtoBuffer(item)))
if ?cpb.getRepeatedField(7, imreceiving):
for item in imreceiving:
control.imreceiving.add(?decodeIMReceiving(initProtoBuffer(item)))
trace "decodeControl: message statistics",
graft_count = len(control.graft),
prune_count = len(control.prune),