Enable Gossip Scoring (#8730)

* enable gossip scoring

* fix some tests

* fix  up

* handle too small numbers

* add caching to validator count

* fix up

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Nishant Das
2021-04-13 10:20:13 +08:00
committed by GitHub
parent a91f2688f1
commit 6aa1297829
5 changed files with 56 additions and 29 deletions

View File

@@ -289,9 +289,9 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
pubsub.WithStrictSignatureVerification(false),
)
require.NoError(t, err)
p := &Service{
host: hosts[0],
ctx: context.Background(),
pubsub: ps1,
dv5Listener: listeners[0],
joinedTopics: map[string]*pubsub.Topic{},
@@ -307,6 +307,7 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
p2 := &Service{
host: hosts[1],
ctx: context.Background(),
pubsub: ps2,
dv5Listener: listeners[1],
joinedTopics: map[string]*pubsub.Topic{},
@@ -329,7 +330,11 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
// External peer subscribes to the topic.
topic += p.Encoding().ProtocolSuffix()
sub, err := p2.SubscribeToTopic(topic)
// We dont use our internal subscribe method
// due to using floodsub over here.
tpHandle, err := p2.JoinTopic(topic)
require.NoError(t, err)
sub, err := tpHandle.Subscribe()
require.NoError(t, err)
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
@@ -339,7 +344,7 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
wg.Add(1)
go func(tt *testing.T) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()
incomingMessage, err := sub.Next(ctx)
@@ -354,7 +359,7 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
// Broadcast to peers and wait.
require.NoError(t, p.BroadcastAttestation(context.Background(), subnet, msg))
if testutil.WaitTimeout(&wg, 1*time.Second) {
t.Error("Failed to receive pubsub within 5s")
if testutil.WaitTimeout(&wg, 4*time.Second) {
t.Error("Failed to receive pubsub within 4s")
}
}

View File

@@ -104,6 +104,9 @@ func (s *Service) topicScoreParams(topic string) (*pubsub.TopicScoreParams, erro
}
func (s *Service) retrieveActiveValidators() (uint64, error) {
if s.activeValidatorCount != 0 {
return s.activeValidatorCount, nil
}
rt := s.cfg.DB.LastArchivedRoot(s.ctx)
if rt == params.BeaconConfig().ZeroHash {
genState, err := s.cfg.DB.GenesisState(s.ctx)
@@ -113,7 +116,13 @@ func (s *Service) retrieveActiveValidators() (uint64, error) {
if genState == nil {
return 0, errors.New("no genesis state exists")
}
return helpers.ActiveValidatorCount(genState, helpers.CurrentEpoch(genState))
activeVals, err := helpers.ActiveValidatorCount(genState, helpers.CurrentEpoch(genState))
if err != nil {
return 0, err
}
// Cache active validator count
s.activeValidatorCount = activeVals
return activeVals, nil
}
bState, err := s.cfg.DB.State(s.ctx, rt)
if err != nil {
@@ -122,7 +131,13 @@ func (s *Service) retrieveActiveValidators() (uint64, error) {
if bState == nil {
return 0, errors.Errorf("no state with root %#x exists", rt)
}
return helpers.ActiveValidatorCount(bState, helpers.CurrentEpoch(bState))
activeVals, err := helpers.ActiveValidatorCount(bState, helpers.CurrentEpoch(bState))
if err != nil {
return 0, err
}
// Cache active validator count
s.activeValidatorCount = activeVals
return activeVals, nil
}
// Based on the lighthouse parameters.
@@ -163,12 +178,14 @@ func defaultAggregateTopicParams(activeValidators uint64) (*pubsub.TopicScorePar
aggPerSlot := aggregatorsPerSlot(activeValidators)
firstMessageCap, err := decayLimit(scoreDecay(1*oneEpochDuration()), float64(aggPerSlot*2/gossipSubD))
if err != nil {
return nil, err
log.Warnf("skipping initializing topic scoring: %v", err)
return nil, nil
}
firstMessageWeight := maxFirstDeliveryScore / firstMessageCap
meshThreshold, err := decayThreshold(scoreDecay(1*oneEpochDuration()), float64(aggPerSlot)/dampeningFactor)
if err != nil {
return nil, err
log.Warnf("skipping initializing topic scoring: %v", err)
return nil, nil
}
meshWeight := -scoreByWeight(aggregateWeight, meshThreshold)
meshCap := 4 * meshThreshold
@@ -203,8 +220,16 @@ func defaultAggregateSubnetTopicParams(activeValidators uint64) (*pubsub.TopicSc
// Get weight for each specific subnet.
topicWeight := attestationTotalWeight / float64(subnetCount)
subnetWeight := activeValidators / subnetCount
if subnetWeight == 0 {
log.Warn("Subnet weight is 0, skipping initializing topic scoring")
return nil, nil
}
// Determine the amount of validators expected in a subnet in a single slot.
numPerSlot := time.Duration(subnetWeight / uint64(params.BeaconConfig().SlotsPerEpoch))
if numPerSlot == 0 {
log.Warn("numPerSlot is 0, skipping initializing topic scoring")
return nil, nil
}
comsPerSlot := committeeCountPerSlot(activeValidators)
exceedsThreshold := comsPerSlot >= 2*subnetCount/uint64(params.BeaconConfig().SlotsPerEpoch)
firstDecay := time.Duration(1)
@@ -216,13 +241,15 @@ func defaultAggregateSubnetTopicParams(activeValidators uint64) (*pubsub.TopicSc
// Determine expected first deliveries based on the message rate.
firstMessageCap, err := decayLimit(scoreDecay(firstDecay*oneEpochDuration()), float64(numPerSlot*2/gossipSubD))
if err != nil {
return nil, err
log.Warnf("skipping initializing topic scoring: %v", err)
return nil, nil
}
firstMessageWeight := maxFirstDeliveryScore / firstMessageCap
// Determine expected mesh deliveries based on message rate applied with a dampening factor.
meshThreshold, err := decayThreshold(scoreDecay(meshDecay*oneEpochDuration()), float64(numPerSlot)/dampeningFactor)
if err != nil {
return nil, err
log.Warnf("skipping initializing topic scoring: %v", err)
return nil, nil
}
meshWeight := -scoreByWeight(topicWeight, meshThreshold)
meshCap := 4 * meshThreshold

View File

@@ -55,6 +55,8 @@ func TestCorrect_ActiveValidatorsCount(t *testing.T) {
}
require.NoError(t, bState.SetSlot(10000))
require.NoError(t, db.SaveState(s.ctx, bState, [32]byte{'a'}))
// Reset count
s.activeValidatorCount = 0
// Retrieve last archived state.
vals, err = s.retrieveActiveValidators()

View File

@@ -96,18 +96,16 @@ func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub
if err != nil {
return nil, err
}
if featureconfig.Get().EnablePeerScorer {
scoringParams, err := s.topicScoreParams(topic)
if err != nil {
scoringParams, err := s.topicScoreParams(topic)
if err != nil {
return nil, err
}
if scoringParams != nil {
if err := topicHandle.SetScoreParams(scoringParams); err != nil {
return nil, err
}
if scoringParams != nil {
if err := topicHandle.SetScoreParams(scoringParams); err != nil {
return nil, err
}
logGossipParameters(topic, scoringParams)
}
logGossipParameters(topic, scoringParams)
}
return topicHandle.Subscribe(opts...)
}

View File

@@ -29,7 +29,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/slotutil"
@@ -80,6 +79,7 @@ type Service struct {
host host.Host
genesisTime time.Time
genesisValidatorsRoot []byte
activeValidatorCount uint64
}
// NewService initializes a new p2p service compatible with shared.Service interface. No
@@ -142,13 +142,8 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
pubsub.WithSubscriptionFilter(s),
pubsub.WithPeerOutboundQueueSize(256),
pubsub.WithValidateQueueSize(256),
}
// Add gossip scoring options.
if featureconfig.Get().EnablePeerScorer {
psOpts = append(
psOpts,
pubsub.WithPeerScore(peerScoringParams()),
pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute))
pubsub.WithPeerScore(peerScoringParams()),
pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute),
}
// Set the pubsub global parameters that we require.
setPubSubParameters()