mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Update Gossip Parameters (#8683)
* add in more accurate aggregate parameters * add more param changes * more cleanup * fix order of operations * comments * remove redundant declaration * clean up better * fix up * victor's review * disable mesh scoring * disable mesh scoring Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> Co-authored-by: Victor Farazdagi <simple.square@gmail.com>
This commit is contained in:
@@ -2,6 +2,7 @@ package p2p
|
||||
|
||||
import (
|
||||
"math"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -10,6 +11,7 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -32,9 +34,22 @@ const (
|
||||
// our voluntary exit topic.
|
||||
voluntaryExitWeight = 0.05
|
||||
|
||||
// maxInMeshScore describes the max score a peer can attain from being in the mesh.
|
||||
maxInMeshScore = 10
|
||||
// maxFirstDeliveryScore describes the max score a peer can obtain from first deliveries.
|
||||
maxFirstDeliveryScore = 40
|
||||
|
||||
// decayToZero specifies the terminal value that we will use when decaying
|
||||
// a value.
|
||||
decayToZero = 0.01
|
||||
|
||||
// dampeningFactor reduces the amount by which the various thresholds and caps are created.
|
||||
dampeningFactor = 90
|
||||
)
|
||||
|
||||
var (
|
||||
// a bool to check if we enable scoring for messages in the mesh sent for near first deliveries.
|
||||
meshDeliveryIsScored = false
|
||||
)
|
||||
|
||||
func peerScoringParams() (*pubsub.PeerScoreParams, *pubsub.PeerScoreThresholds) {
|
||||
@@ -74,9 +89,9 @@ func (s *Service) topicScoreParams(topic string) (*pubsub.TopicScoreParams, erro
|
||||
case strings.Contains(topic, "beacon_block"):
|
||||
return defaultBlockTopicParams(), nil
|
||||
case strings.Contains(topic, "beacon_aggregate_and_proof"):
|
||||
return defaultAggregateTopicParams(activeValidators), nil
|
||||
return defaultAggregateTopicParams(activeValidators)
|
||||
case strings.Contains(topic, "beacon_attestation"):
|
||||
return defaultAggregateSubnetTopicParams(activeValidators), nil
|
||||
return defaultAggregateSubnetTopicParams(activeValidators)
|
||||
case strings.Contains(topic, "voluntary_exit"):
|
||||
return defaultVoluntaryExitTopicParams(), nil
|
||||
case strings.Contains(topic, "proposer_slashing"):
|
||||
@@ -110,96 +125,139 @@ func (s *Service) retrieveActiveValidators() (uint64, error) {
|
||||
return helpers.ActiveValidatorCount(bState, helpers.CurrentEpoch(bState))
|
||||
}
|
||||
|
||||
// Based on Ben's tested parameters for lighthouse.
|
||||
// Based on the lighthouse parameters.
|
||||
// https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c
|
||||
|
||||
func defaultBlockTopicParams() *pubsub.TopicScoreParams {
|
||||
decayEpoch := time.Duration(5)
|
||||
blocksPerEpoch := uint64(params.BeaconConfig().SlotsPerEpoch)
|
||||
meshWeight := -0.717
|
||||
if !meshDeliveryIsScored {
|
||||
// Set the mesh weight as zero as a temporary measure, so as to prevent
|
||||
// the average nodes from being penalised.
|
||||
meshWeight = 0
|
||||
}
|
||||
return &pubsub.TopicScoreParams{
|
||||
TopicWeight: beaconBlockWeight,
|
||||
TimeInMeshWeight: 0.0324,
|
||||
TimeInMeshQuantum: 1 * oneSlotDuration(),
|
||||
TimeInMeshCap: 300,
|
||||
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
|
||||
TimeInMeshQuantum: inMeshTime(),
|
||||
TimeInMeshCap: inMeshCap(),
|
||||
FirstMessageDeliveriesWeight: 1,
|
||||
FirstMessageDeliveriesDecay: scoreDecay(20 * oneEpochDuration()),
|
||||
FirstMessageDeliveriesCap: 23,
|
||||
MeshMessageDeliveriesWeight: -0.717,
|
||||
MeshMessageDeliveriesWeight: meshWeight,
|
||||
MeshMessageDeliveriesDecay: scoreDecay(decayEpoch * oneEpochDuration()),
|
||||
MeshMessageDeliveriesCap: float64(blocksPerEpoch * uint64(decayEpoch)),
|
||||
MeshMessageDeliveriesThreshold: float64(blocksPerEpoch*uint64(decayEpoch)) / 10,
|
||||
MeshMessageDeliveriesWindow: 2 * time.Second,
|
||||
MeshMessageDeliveriesActivation: 4 * oneEpochDuration(),
|
||||
MeshFailurePenaltyWeight: -0.717,
|
||||
MeshFailurePenaltyWeight: meshWeight,
|
||||
MeshFailurePenaltyDecay: scoreDecay(decayEpoch * oneEpochDuration()),
|
||||
InvalidMessageDeliveriesWeight: -140.4475,
|
||||
InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()),
|
||||
}
|
||||
}
|
||||
|
||||
func defaultAggregateTopicParams(activeValidators uint64) *pubsub.TopicScoreParams {
|
||||
aggPerEpoch := aggregatorsPerSlot(activeValidators) * uint64(params.BeaconConfig().SlotsPerEpoch)
|
||||
func defaultAggregateTopicParams(activeValidators uint64) (*pubsub.TopicScoreParams, error) {
|
||||
// Determine the expected message rate for the particular gossip topic.
|
||||
aggPerSlot := aggregatorsPerSlot(activeValidators)
|
||||
firstMessageCap, err := decayLimit(scoreDecay(1*oneEpochDuration()), float64(aggPerSlot*2/gossipSubD))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
firstMessageWeight := maxFirstDeliveryScore / firstMessageCap
|
||||
meshThreshold, err := decayThreshold(scoreDecay(1*oneEpochDuration()), float64(aggPerSlot)/dampeningFactor)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
meshWeight := -scoreByWeight(aggregateWeight, meshThreshold)
|
||||
meshCap := 4 * meshThreshold
|
||||
if !meshDeliveryIsScored {
|
||||
// Set the mesh weight as zero as a temporary measure, so as to prevent
|
||||
// the average nodes from being penalised.
|
||||
meshWeight = 0
|
||||
}
|
||||
return &pubsub.TopicScoreParams{
|
||||
TopicWeight: aggregateWeight,
|
||||
TimeInMeshWeight: 0.0324,
|
||||
TimeInMeshQuantum: 1 * oneSlotDuration(),
|
||||
TimeInMeshCap: 300,
|
||||
FirstMessageDeliveriesWeight: 0.128,
|
||||
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
|
||||
TimeInMeshQuantum: inMeshTime(),
|
||||
TimeInMeshCap: inMeshCap(),
|
||||
FirstMessageDeliveriesWeight: firstMessageWeight,
|
||||
FirstMessageDeliveriesDecay: scoreDecay(1 * oneEpochDuration()),
|
||||
FirstMessageDeliveriesCap: 179,
|
||||
MeshMessageDeliveriesWeight: -0.064,
|
||||
FirstMessageDeliveriesCap: firstMessageCap,
|
||||
MeshMessageDeliveriesWeight: meshWeight,
|
||||
MeshMessageDeliveriesDecay: scoreDecay(1 * oneEpochDuration()),
|
||||
MeshMessageDeliveriesCap: float64(aggPerEpoch),
|
||||
MeshMessageDeliveriesThreshold: float64(aggPerEpoch / 50),
|
||||
MeshMessageDeliveriesCap: meshCap,
|
||||
MeshMessageDeliveriesThreshold: meshThreshold,
|
||||
MeshMessageDeliveriesWindow: 2 * time.Second,
|
||||
MeshMessageDeliveriesActivation: 32 * oneSlotDuration(),
|
||||
MeshFailurePenaltyWeight: -0.064,
|
||||
MeshMessageDeliveriesActivation: 1 * oneEpochDuration(),
|
||||
MeshFailurePenaltyWeight: meshWeight,
|
||||
MeshFailurePenaltyDecay: scoreDecay(1 * oneEpochDuration()),
|
||||
InvalidMessageDeliveriesWeight: -140.4475,
|
||||
InvalidMessageDeliveriesWeight: -maxScore() / aggregateWeight,
|
||||
InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()),
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func defaultAggregateSubnetTopicParams(activeValidators uint64) *pubsub.TopicScoreParams {
|
||||
topicWeight := attestationTotalWeight / float64(params.BeaconNetworkConfig().AttestationSubnetCount)
|
||||
subnetWeight := activeValidators / params.BeaconNetworkConfig().AttestationSubnetCount
|
||||
minimumWeight := subnetWeight / 50
|
||||
func defaultAggregateSubnetTopicParams(activeValidators uint64) (*pubsub.TopicScoreParams, error) {
|
||||
subnetCount := params.BeaconNetworkConfig().AttestationSubnetCount
|
||||
// Get weight for each specific subnet.
|
||||
topicWeight := attestationTotalWeight / float64(subnetCount)
|
||||
subnetWeight := activeValidators / subnetCount
|
||||
// Determine the amount of validators expected in a subnet in a single slot.
|
||||
numPerSlot := time.Duration(subnetWeight / uint64(params.BeaconConfig().SlotsPerEpoch))
|
||||
comsPerSlot := committeeCountPerSlot(activeValidators)
|
||||
exceedsThreshold := comsPerSlot >= 2*params.BeaconNetworkConfig().AttestationSubnetCount/uint64(params.BeaconConfig().SlotsPerEpoch)
|
||||
exceedsThreshold := comsPerSlot >= 2*subnetCount/uint64(params.BeaconConfig().SlotsPerEpoch)
|
||||
firstDecay := time.Duration(1)
|
||||
meshDecay := time.Duration(4)
|
||||
if exceedsThreshold {
|
||||
firstDecay = 4
|
||||
meshDecay = 16
|
||||
}
|
||||
// Determine expected first deliveries based on the message rate.
|
||||
firstMessageCap, err := decayLimit(scoreDecay(firstDecay*oneEpochDuration()), float64(numPerSlot*2/gossipSubD))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
firstMessageWeight := maxFirstDeliveryScore / firstMessageCap
|
||||
// Determine expected mesh deliveries based on message rate applied with a dampening factor.
|
||||
meshThreshold, err := decayThreshold(scoreDecay(meshDecay*oneEpochDuration()), float64(numPerSlot)/dampeningFactor)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
meshWeight := -scoreByWeight(topicWeight, meshThreshold)
|
||||
meshCap := 4 * meshThreshold
|
||||
if !meshDeliveryIsScored {
|
||||
// Set the mesh weight as zero as a temporary measure, so as to prevent
|
||||
// the average nodes from being penalised.
|
||||
meshWeight = 0
|
||||
}
|
||||
return &pubsub.TopicScoreParams{
|
||||
TopicWeight: topicWeight,
|
||||
TimeInMeshWeight: 0.0324,
|
||||
TimeInMeshQuantum: numPerSlot,
|
||||
TimeInMeshCap: 300,
|
||||
FirstMessageDeliveriesWeight: 0.955,
|
||||
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
|
||||
TimeInMeshQuantum: inMeshTime(),
|
||||
TimeInMeshCap: inMeshCap(),
|
||||
FirstMessageDeliveriesWeight: firstMessageWeight,
|
||||
FirstMessageDeliveriesDecay: scoreDecay(firstDecay * oneEpochDuration()),
|
||||
FirstMessageDeliveriesCap: 24,
|
||||
MeshMessageDeliveriesWeight: -37.55,
|
||||
FirstMessageDeliveriesCap: firstMessageCap,
|
||||
MeshMessageDeliveriesWeight: meshWeight,
|
||||
MeshMessageDeliveriesDecay: scoreDecay(meshDecay * oneEpochDuration()),
|
||||
MeshMessageDeliveriesCap: float64(subnetWeight),
|
||||
MeshMessageDeliveriesThreshold: float64(minimumWeight),
|
||||
MeshMessageDeliveriesCap: meshCap,
|
||||
MeshMessageDeliveriesThreshold: meshThreshold,
|
||||
MeshMessageDeliveriesWindow: 2 * time.Second,
|
||||
MeshMessageDeliveriesActivation: 17 * oneSlotDuration(),
|
||||
MeshFailurePenaltyWeight: -37.55,
|
||||
MeshMessageDeliveriesActivation: 1 * oneEpochDuration(),
|
||||
MeshFailurePenaltyWeight: meshWeight,
|
||||
MeshFailurePenaltyDecay: scoreDecay(meshDecay * oneEpochDuration()),
|
||||
InvalidMessageDeliveriesWeight: -4544,
|
||||
InvalidMessageDeliveriesWeight: -maxScore() / topicWeight,
|
||||
InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()),
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func defaultAttesterSlashingTopicParams() *pubsub.TopicScoreParams {
|
||||
return &pubsub.TopicScoreParams{
|
||||
TopicWeight: attesterSlashingWeight,
|
||||
TimeInMeshWeight: 0.0324,
|
||||
TimeInMeshQuantum: 1 * oneSlotDuration(),
|
||||
TimeInMeshCap: 300,
|
||||
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
|
||||
TimeInMeshQuantum: inMeshTime(),
|
||||
TimeInMeshCap: inMeshCap(),
|
||||
FirstMessageDeliveriesWeight: 36,
|
||||
FirstMessageDeliveriesDecay: scoreDecay(100 * oneEpochDuration()),
|
||||
FirstMessageDeliveriesCap: 1,
|
||||
@@ -219,9 +277,9 @@ func defaultAttesterSlashingTopicParams() *pubsub.TopicScoreParams {
|
||||
func defaultProposerSlashingTopicParams() *pubsub.TopicScoreParams {
|
||||
return &pubsub.TopicScoreParams{
|
||||
TopicWeight: proposerSlashingWeight,
|
||||
TimeInMeshWeight: 0.0324,
|
||||
TimeInMeshQuantum: 1 * oneSlotDuration(),
|
||||
TimeInMeshCap: 300,
|
||||
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
|
||||
TimeInMeshQuantum: inMeshTime(),
|
||||
TimeInMeshCap: inMeshCap(),
|
||||
FirstMessageDeliveriesWeight: 36,
|
||||
FirstMessageDeliveriesDecay: scoreDecay(100 * oneEpochDuration()),
|
||||
FirstMessageDeliveriesCap: 1,
|
||||
@@ -241,9 +299,9 @@ func defaultProposerSlashingTopicParams() *pubsub.TopicScoreParams {
|
||||
func defaultVoluntaryExitTopicParams() *pubsub.TopicScoreParams {
|
||||
return &pubsub.TopicScoreParams{
|
||||
TopicWeight: voluntaryExitWeight,
|
||||
TimeInMeshWeight: 0.0324,
|
||||
TimeInMeshQuantum: 1 * oneSlotDuration(),
|
||||
TimeInMeshCap: 300,
|
||||
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
|
||||
TimeInMeshQuantum: inMeshTime(),
|
||||
TimeInMeshCap: inMeshCap(),
|
||||
FirstMessageDeliveriesWeight: 2,
|
||||
FirstMessageDeliveriesDecay: scoreDecay(100 * oneEpochDuration()),
|
||||
FirstMessageDeliveriesCap: 5,
|
||||
@@ -268,11 +326,33 @@ func oneEpochDuration() time.Duration {
|
||||
return time.Duration(params.BeaconConfig().SlotsPerEpoch) * oneSlotDuration()
|
||||
}
|
||||
|
||||
// determines the decay rate from the provided time period till
|
||||
// the decayToZero value. Ex: ( 1 -> 0.01)
|
||||
func scoreDecay(totalDurationDecay time.Duration) float64 {
|
||||
numOfTimes := totalDurationDecay / oneSlotDuration()
|
||||
return math.Pow(decayToZero, 1/float64(numOfTimes))
|
||||
}
|
||||
|
||||
// is used to determine the threshold from the decay limit with
|
||||
// a provided growth rate. This applies the decay rate to a
|
||||
// computed limit.
|
||||
func decayThreshold(decayRate, rate float64) (float64, error) {
|
||||
d, err := decayLimit(decayRate, rate)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return d * decayRate, nil
|
||||
}
|
||||
|
||||
// decayLimit provides the value till which a decay process will
|
||||
// limit till provided with an expected growth rate.
|
||||
func decayLimit(decayRate, rate float64) (float64, error) {
|
||||
if 1 <= decayRate {
|
||||
return 0, errors.Errorf("got an invalid decayLimit rate: %f", decayRate)
|
||||
}
|
||||
return rate / (1 - decayRate), nil
|
||||
}
|
||||
|
||||
func committeeCountPerSlot(activeValidators uint64) uint64 {
|
||||
// Use a static parameter for now rather than a dynamic one, we can use
|
||||
// the actual parameter later when we have figured out how to fix a circular
|
||||
@@ -286,3 +366,40 @@ func aggregatorsPerSlot(activeValidators uint64) uint64 {
|
||||
totalAggs := comms * params.BeaconConfig().TargetAggregatorsPerCommittee
|
||||
return totalAggs
|
||||
}
|
||||
|
||||
// provides the relevant score by the provided weight and threshold.
|
||||
func scoreByWeight(weight float64, threshold float64) float64 {
|
||||
return maxScore() / (weight * threshold * threshold)
|
||||
}
|
||||
|
||||
// maxScore attainable by a peer.
|
||||
func maxScore() float64 {
|
||||
totalWeight := beaconBlockWeight + aggregateWeight + attestationTotalWeight +
|
||||
attesterSlashingWeight + proposerSlashingWeight + voluntaryExitWeight
|
||||
return (maxInMeshScore + maxFirstDeliveryScore) * totalWeight
|
||||
}
|
||||
|
||||
// denotes the unit time in mesh for scoring tallying.
|
||||
func inMeshTime() time.Duration {
|
||||
return 1 * oneSlotDuration()
|
||||
}
|
||||
|
||||
// the cap for `inMesh` time scoring.
|
||||
func inMeshCap() float64 {
|
||||
return float64((3600 * time.Second) / inMeshTime())
|
||||
}
|
||||
|
||||
func logGossipParameters(topic string, params *pubsub.TopicScoreParams) {
|
||||
// Exit early in the event the parameter struct is nil.
|
||||
if params == nil {
|
||||
return
|
||||
}
|
||||
rawParams := reflect.ValueOf(params).Elem()
|
||||
numOfFields := rawParams.NumField()
|
||||
|
||||
fields := make(logrus.Fields, numOfFields)
|
||||
for i := 0; i < numOfFields; i++ {
|
||||
fields[reflect.TypeOf(params).Elem().Field(i).Name] = rawParams.Field(i).Interface()
|
||||
}
|
||||
log.WithFields(fields).Debugf("Topic Parameters for %s", topic)
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
dbutil "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
@@ -60,3 +61,19 @@ func TestCorrect_ActiveValidatorsCount(t *testing.T) {
|
||||
assert.NoError(t, err, "genesis state not retrieved")
|
||||
assert.Equal(t, int(params.BeaconConfig().MinGenesisActiveValidatorCount)+100, int(vals), "mainnet genesis active count isn't accurate")
|
||||
}
|
||||
|
||||
func TestLoggingParameters(t *testing.T) {
|
||||
logGossipParameters("testing", nil)
|
||||
logGossipParameters("testing", &pubsub.TopicScoreParams{})
|
||||
// Test out actual gossip parameters.
|
||||
logGossipParameters("testing", defaultBlockTopicParams())
|
||||
p, err := defaultAggregateSubnetTopicParams(10000)
|
||||
assert.NoError(t, err)
|
||||
logGossipParameters("testing", p)
|
||||
p, err = defaultAggregateTopicParams(10000)
|
||||
assert.NoError(t, err)
|
||||
logGossipParameters("testing", p)
|
||||
logGossipParameters("testing", defaultAttesterSlashingTopicParams())
|
||||
logGossipParameters("testing", defaultProposerSlashingTopicParams())
|
||||
logGossipParameters("testing", defaultVoluntaryExitTopicParams())
|
||||
}
|
||||
|
||||
@@ -2,33 +2,11 @@ package p2p
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
|
||||
)
|
||||
|
||||
const (
|
||||
// overlay parameters
|
||||
gossipSubD = 8 // topic stable mesh target count
|
||||
gossipSubDlo = 6 // topic stable mesh low watermark
|
||||
gossipSubDhi = 12 // topic stable mesh high watermark
|
||||
|
||||
// gossip parameters
|
||||
gossipSubMcacheLen = 6 // number of windows to retain full messages in cache for `IWANT` responses
|
||||
gossipSubMcacheGossip = 3 // number of windows to gossip about
|
||||
gossipSubSeenTTL = 550 // number of heartbeat intervals to retain message IDs
|
||||
|
||||
// fanout ttl
|
||||
gossipSubFanoutTTL = 60000000000 // TTL for fanout maps for topics we are not subscribed to but have published to, in nano seconds
|
||||
|
||||
// heartbeat interval
|
||||
gossipSubHeartbeatInterval = 700 * time.Millisecond // frequency of heartbeat, milliseconds
|
||||
|
||||
// misc
|
||||
randomSubD = 6 // random gossip target
|
||||
)
|
||||
|
||||
func TestOverlayParameters(t *testing.T) {
|
||||
setPubSubParameters()
|
||||
assert.Equal(t, gossipSubD, pubsub.GossipSubD, "gossipSubD")
|
||||
|
||||
@@ -15,6 +15,27 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
)
|
||||
|
||||
const (
|
||||
// overlay parameters
|
||||
gossipSubD = 8 // topic stable mesh target count
|
||||
gossipSubDlo = 6 // topic stable mesh low watermark
|
||||
gossipSubDhi = 12 // topic stable mesh high watermark
|
||||
|
||||
// gossip parameters
|
||||
gossipSubMcacheLen = 6 // number of windows to retain full messages in cache for `IWANT` responses
|
||||
gossipSubMcacheGossip = 3 // number of windows to gossip about
|
||||
gossipSubSeenTTL = 550 // number of heartbeat intervals to retain message IDs
|
||||
|
||||
// fanout ttl
|
||||
gossipSubFanoutTTL = 60000000000 // TTL for fanout maps for topics we are not subscribed to but have published to, in nano seconds
|
||||
|
||||
// heartbeat interval
|
||||
gossipSubHeartbeatInterval = 700 * time.Millisecond // frequency of heartbeat, milliseconds
|
||||
|
||||
// misc
|
||||
randomSubD = 6 // random gossip target
|
||||
)
|
||||
|
||||
// JoinTopic will join PubSub topic, if not already joined.
|
||||
func (s *Service) JoinTopic(topic string, opts ...pubsub.TopicOpt) (*pubsub.Topic, error) {
|
||||
s.joinedTopicsLock.Lock()
|
||||
@@ -80,10 +101,12 @@ func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if scoringParams != nil {
|
||||
if err := topicHandle.SetScoreParams(scoringParams); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logGossipParameters(topic, scoringParams)
|
||||
}
|
||||
}
|
||||
return topicHandle.Subscribe(opts...)
|
||||
@@ -124,13 +147,12 @@ func msgIDFunction(pmsg *pubsub_pb.Message) string {
|
||||
}
|
||||
|
||||
func setPubSubParameters() {
|
||||
heartBeatInterval := 700 * time.Millisecond
|
||||
pubsub.GossipSubDlo = 6
|
||||
pubsub.GossipSubD = 8
|
||||
pubsub.GossipSubHeartbeatInterval = heartBeatInterval
|
||||
pubsub.GossipSubHistoryLength = 6
|
||||
pubsub.GossipSubHistoryGossip = 3
|
||||
pubsub.TimeCacheDuration = 550 * heartBeatInterval
|
||||
pubsub.GossipSubDlo = gossipSubDlo
|
||||
pubsub.GossipSubD = gossipSubD
|
||||
pubsub.GossipSubHeartbeatInterval = gossipSubHeartbeatInterval
|
||||
pubsub.GossipSubHistoryLength = gossipSubMcacheLen
|
||||
pubsub.GossipSubHistoryGossip = gossipSubMcacheGossip
|
||||
pubsub.TimeCacheDuration = 550 * gossipSubHeartbeatInterval
|
||||
|
||||
// Set a larger gossip history to ensure that slower
|
||||
// messages have a longer time to be propagated. This
|
||||
|
||||
Reference in New Issue
Block a user