Refactor scoring service (#7841)

* refactor scoring service

* fix anti-pattern issue

* add block providers bad peers detection tests

* check status when peer scoring is disabled

* more tests
This commit is contained in:
Victor Farazdagi
2020-11-17 18:28:13 +03:00
committed by GitHub
parent ad5151f25d
commit 2034c662af
8 changed files with 198 additions and 64 deletions

View File

@@ -1,18 +1,17 @@
package scorers
import (
"context"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
)
var _ Scorer = (*BadResponsesScorer)(nil)
const (
// DefaultBadResponsesThreshold defines how many bad responses to tolerate before peer is deemed bad.
DefaultBadResponsesThreshold = 6
// DefaultBadResponsesWeight is a default weight. Since score represents penalty, it has negative weight.
DefaultBadResponsesWeight = -1.0
// DefaultBadResponsesDecayInterval defines how often to decay previous statistics.
// Every interval bad responses counter will be decremented by 1.
DefaultBadResponsesDecayInterval = time.Hour
@@ -20,7 +19,6 @@ const (
// BadResponsesScorer represents bad responses scoring service.
type BadResponsesScorer struct {
ctx context.Context
config *BadResponsesScorerConfig
store *peerdata.Store
}
@@ -29,29 +27,22 @@ type BadResponsesScorer struct {
type BadResponsesScorerConfig struct {
// Threshold specifies number of bad responses tolerated, before peer is banned.
Threshold int
// Weight defines weight of bad response/threshold ratio on overall score.
Weight float64
// DecayInterval specifies how often bad response stats should be decayed.
DecayInterval time.Duration
}
// newBadResponsesScorer creates new bad responses scoring service.
func newBadResponsesScorer(
ctx context.Context, store *peerdata.Store, config *BadResponsesScorerConfig) *BadResponsesScorer {
func newBadResponsesScorer(store *peerdata.Store, config *BadResponsesScorerConfig) *BadResponsesScorer {
if config == nil {
config = &BadResponsesScorerConfig{}
}
scorer := &BadResponsesScorer{
ctx: ctx,
config: config,
store: store,
}
if scorer.config.Threshold == 0 {
scorer.config.Threshold = DefaultBadResponsesThreshold
}
if scorer.config.Weight == 0.0 {
scorer.config.Weight = DefaultBadResponsesWeight
}
if scorer.config.DecayInterval == 0 {
scorer.config.DecayInterval = DefaultBadResponsesDecayInterval
}
@@ -65,7 +56,7 @@ func (s *BadResponsesScorer) Score(pid peer.ID) float64 {
return s.score(pid)
}
// score is a lock-free version of ScoreBadResponses.
// score is a lock-free version of Score.
func (s *BadResponsesScorer) score(pid peer.ID) float64 {
score := float64(0)
peerData, ok := s.store.PeerData(pid)
@@ -74,7 +65,8 @@ func (s *BadResponsesScorer) score(pid peer.ID) float64 {
}
if peerData.BadResponses > 0 {
score = float64(peerData.BadResponses) / float64(s.config.Threshold)
score = score * s.config.Weight
// Since score represents a penalty, negate it.
score *= -1
}
return score
}
@@ -131,7 +123,7 @@ func (s *BadResponsesScorer) isBadPeer(pid peer.ID) bool {
return false
}
// BadPeers returns the peers that are bad.
// BadPeers returns the peers that are considered bad.
func (s *BadResponsesScorer) BadPeers() []peer.ID {
s.store.RLock()
defer s.store.RUnlock()

View File

@@ -86,7 +86,6 @@ func TestScorers_BadResponses_Decay(t *testing.T) {
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
Weight: 1,
},
},
})

View File

@@ -1,7 +1,6 @@
package scorers
import (
"context"
"fmt"
"math"
"sort"
@@ -15,6 +14,8 @@ import (
"github.com/prysmaticlabs/prysm/shared/timeutils"
)
var _ Scorer = (*BlockProviderScorer)(nil)
const (
// DefaultBlockProviderProcessedBatchWeight is a default reward weight of a processed batch of blocks.
DefaultBlockProviderProcessedBatchWeight = float64(0.1)
@@ -35,7 +36,6 @@ const (
// BlockProviderScorer represents block provider scoring service.
type BlockProviderScorer struct {
ctx context.Context
config *BlockProviderScorerConfig
store *peerdata.Store
// maxScore is a cached value for maximum attainable block provider score.
@@ -62,13 +62,11 @@ type BlockProviderScorerConfig struct {
}
// newBlockProviderScorer creates block provider scoring service.
func newBlockProviderScorer(
ctx context.Context, store *peerdata.Store, config *BlockProviderScorerConfig) *BlockProviderScorer {
func newBlockProviderScorer(store *peerdata.Store, config *BlockProviderScorerConfig) *BlockProviderScorer {
if config == nil {
config = &BlockProviderScorerConfig{}
}
scorer := &BlockProviderScorer{
ctx: ctx,
config: config,
store: store,
}
@@ -176,6 +174,20 @@ func (s *BlockProviderScorer) processedBlocks(pid peer.ID) uint64 {
return 0
}
// IsBadPeer states if the peer is to be considered bad.
// Block provider scorer cannot guarantee that lower score of a peer is indeed a sign of a bad peer.
// Therefore this scorer never marks peers as bad, and relies on scores to probabilistically sort
// out low-scorers (see WeightSorted method).
func (s *BlockProviderScorer) IsBadPeer(_ peer.ID) bool {
return false
}
// BadPeers returns the peers that are considered bad.
// No peers are considered bad by block providers scorer.
func (s *BlockProviderScorer) BadPeers() []peer.ID {
return []peer.ID{}
}
// Decay updates block provider counters by decaying them.
// This urges peers to keep up the performance to continue getting a high score (and allows
// new peers to contest previously high scoring ones).

View File

@@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/timeutils"
@@ -438,17 +439,20 @@ func TestScorers_BlockProvider_FormatScorePretty(t *testing.T) {
},
}
peerStatusGen := func() *peers.Status {
return peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &scorers.Config{
BlockProviderScorerConfig: &scorers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.05,
ProcessedBlocksCap: 20 * batchSize,
Decay: 10 * batchSize,
},
},
})
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &scorers.Config{
BlockProviderScorerConfig: &scorers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.05,
ProcessedBlocksCap: 20 * batchSize,
Decay: 10 * batchSize,
},
},
})
peerStatuses := peerStatusGen()
scorer := peerStatuses.Scorers().BlockProviderScorer()
if tt.update != nil {
tt.update(scorer)
@@ -456,4 +460,29 @@ func TestScorers_BlockProvider_FormatScorePretty(t *testing.T) {
tt.check(scorer)
})
}
t.Run("peer scorer disabled", func(t *testing.T) {
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{
EnablePeerScorer: false,
})
defer resetCfg()
peerStatuses := peerStatusGen()
scorer := peerStatuses.Scorers().BlockProviderScorer()
assert.Equal(t, "disabled", scorer.FormatScorePretty("peer1"))
})
}
func TestScorers_BlockProvider_BadPeerMarking(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &scorers.Config{},
})
scorer := peerStatuses.Scorers().BlockProviderScorer()
assert.Equal(t, false, scorer.IsBadPeer("peer1"), "Unexpected status for unregistered peer")
scorer.IncrementProcessedBlocks("peer1", 64)
assert.Equal(t, false, scorer.IsBadPeer("peer1"))
assert.Equal(t, 0, len(scorer.BadPeers()))
}

View File

@@ -9,18 +9,28 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
)
var _ Scorer = (*Service)(nil)
// ScoreRoundingFactor defines how many digits to keep in decimal part.
// This parameter is used in math.Round(score*ScoreRoundingFactor) / ScoreRoundingFactor.
const ScoreRoundingFactor = 10000
// Scorer defines minimum set of methods every peer scorer must expose.
type Scorer interface {
Score(pid peer.ID) float64
IsBadPeer(pid peer.ID) bool
BadPeers() []peer.ID
}
// Service manages peer scorers that are used to calculate overall peer score.
type Service struct {
ctx context.Context
store *peerdata.Store
scorers struct {
badResponsesScorer *BadResponsesScorer
blockProviderScorer *BlockProviderScorer
}
weights map[Scorer]float64
totalWeight float64
}
// Config holds configuration parameters for scoring service.
@@ -32,12 +42,18 @@ type Config struct {
// NewService provides fully initialized peer scoring service.
func NewService(ctx context.Context, store *peerdata.Store, config *Config) *Service {
s := &Service{
ctx: ctx,
store: store,
store: store,
weights: make(map[Scorer]float64),
}
s.scorers.badResponsesScorer = newBadResponsesScorer(ctx, store, config.BadResponsesScorerConfig)
s.scorers.blockProviderScorer = newBlockProviderScorer(ctx, store, config.BlockProviderScorerConfig)
go s.loop(s.ctx)
// Register scorers.
s.scorers.badResponsesScorer = newBadResponsesScorer(store, config.BadResponsesScorerConfig)
s.setScorerWeight(s.scorers.badResponsesScorer, 1.0)
s.scorers.blockProviderScorer = newBlockProviderScorer(store, config.BlockProviderScorerConfig)
s.setScorerWeight(s.scorers.blockProviderScorer, 1.0)
// Start background tasks.
go s.loop(ctx)
return s
}
@@ -52,6 +68,17 @@ func (s *Service) BlockProviderScorer() *BlockProviderScorer {
return s.scorers.blockProviderScorer
}
// ActiveScorersCount returns number of scorers that can affect score (have non-zero weight).
func (s *Service) ActiveScorersCount() int {
cnt := 0
for _, w := range s.weights {
if w > 0 {
cnt++
}
}
return cnt
}
// Score returns calculated peer score across all tracked metrics.
func (s *Service) Score(pid peer.ID) float64 {
s.store.RLock()
@@ -61,11 +88,37 @@ func (s *Service) Score(pid peer.ID) float64 {
if _, ok := s.store.PeerData(pid); !ok {
return 0
}
score += s.scorers.badResponsesScorer.score(pid)
score += s.scorers.blockProviderScorer.score(pid)
score += s.scorers.badResponsesScorer.score(pid) * s.scorerWeight(s.scorers.badResponsesScorer)
score += s.scorers.blockProviderScorer.score(pid) * s.scorerWeight(s.scorers.blockProviderScorer)
return math.Round(score*ScoreRoundingFactor) / ScoreRoundingFactor
}
// IsBadPeer traverses all the scorers to see if any of them classifies peer as bad.
func (s *Service) IsBadPeer(pid peer.ID) bool {
s.store.RLock()
defer s.store.RUnlock()
return s.isBadPeer(pid)
}
// isBadPeer is a lock-free version of isBadPeer.
func (s *Service) isBadPeer(pid peer.ID) bool {
return s.scorers.badResponsesScorer.isBadPeer(pid)
}
// BadPeers returns the peers that are considered bad by any of registered scorers.
func (s *Service) BadPeers() []peer.ID {
s.store.RLock()
defer s.store.RUnlock()
badPeers := make([]peer.ID, 0)
for pid := range s.store.Peers() {
if s.isBadPeer(pid) {
badPeers = append(badPeers, pid)
}
}
return badPeers
}
// loop handles background tasks.
func (s *Service) loop(ctx context.Context) {
decayBadResponsesStats := time.NewTicker(s.scorers.badResponsesScorer.Params().DecayInterval)
@@ -84,3 +137,14 @@ func (s *Service) loop(ctx context.Context) {
}
}
}
// setScorerWeight adds scorer to map of known scorers.
func (s *Service) setScorerWeight(scorer Scorer, weight float64) {
s.weights[scorer] = weight
s.totalWeight += s.weights[scorer]
}
// scorerWeight calculates contribution percentage of a given scorer in total score.
func (s *Service) scorerWeight(scorer Scorer) float64 {
return s.weights[scorer] / s.totalWeight
}

View File

@@ -28,8 +28,8 @@ func TestScorers_Service_Init(t *testing.T) {
t.Run("bad responses scorer", func(t *testing.T) {
params := peerStatuses.Scorers().BadResponsesScorer().Params()
assert.Equal(t, scorers.DefaultBadResponsesThreshold, params.Threshold, "Unexpected threshold value")
assert.Equal(t, scorers.DefaultBadResponsesWeight, params.Weight, "Unexpected weight value")
assert.Equal(t, scorers.DefaultBadResponsesDecayInterval, params.DecayInterval, "Unexpected decay interval value")
assert.Equal(t, scorers.DefaultBadResponsesDecayInterval,
params.DecayInterval, "Unexpected decay interval value")
})
t.Run("block providers scorer", func(t *testing.T) {
@@ -48,7 +48,6 @@ func TestScorers_Service_Init(t *testing.T) {
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 2,
Weight: -1,
DecayInterval: 1 * time.Minute,
},
BlockProviderScorerConfig: &scorers.BlockProviderScorerConfig{
@@ -64,7 +63,6 @@ func TestScorers_Service_Init(t *testing.T) {
t.Run("bad responses scorer", func(t *testing.T) {
params := peerStatuses.Scorers().BadResponsesScorer().Params()
assert.Equal(t, 2, params.Threshold, "Unexpected threshold value")
assert.Equal(t, -1.0, params.Weight, "Unexpected weight value")
assert.Equal(t, 1*time.Minute, params.DecayInterval, "Unexpected decay interval value")
})
@@ -119,7 +117,8 @@ func TestScorers_Service_Score(t *testing.T) {
for _, pid := range pids {
peerStatuses.Add(nil, pid, nil, network.DirUnknown)
// Not yet used peer gets boosted score.
assert.Equal(t, s.BlockProviderScorer().MaxScore(), s.Score(pid), "Unexpected score for not yet used peer")
startScore := s.BlockProviderScorer().MaxScore()
assert.Equal(t, startScore/float64(s.ActiveScorersCount()), s.Score(pid), "Unexpected score for not yet used peer")
}
return s, pids
}
@@ -136,27 +135,29 @@ func TestScorers_Service_Score(t *testing.T) {
t.Run("bad responses score", func(t *testing.T) {
s, pids := setupScorer()
zeroScore := s.BlockProviderScorer().MaxScore()
// Peers start with boosted start score (new peers are boosted by block provider).
startScore := s.BlockProviderScorer().MaxScore() / float64(s.ActiveScorersCount())
penalty := (-1 / float64(s.BadResponsesScorer().Params().Threshold)) / float64(s.ActiveScorersCount())
// Update peers' stats and test the effect on peer order.
s.BadResponsesScorer().Increment("peer2")
assert.DeepEqual(t, pack(s, zeroScore, zeroScore-0.2, zeroScore), peerScores(s, pids), "Unexpected scores")
assert.DeepEqual(t, pack(s, startScore, startScore+penalty, startScore), peerScores(s, pids))
s.BadResponsesScorer().Increment("peer1")
s.BadResponsesScorer().Increment("peer1")
assert.DeepEqual(t, pack(s, zeroScore-0.4, zeroScore-0.2, zeroScore), peerScores(s, pids), "Unexpected scores")
assert.DeepEqual(t, pack(s, startScore+2*penalty, startScore+penalty, startScore), peerScores(s, pids))
// See how decaying affects order of peers.
s.BadResponsesScorer().Decay()
assert.DeepEqual(t, pack(s, zeroScore-0.2, zeroScore, zeroScore), peerScores(s, pids), "Unexpected scores")
assert.DeepEqual(t, pack(s, startScore+penalty, startScore, startScore), peerScores(s, pids))
s.BadResponsesScorer().Decay()
assert.DeepEqual(t, pack(s, zeroScore, zeroScore, zeroScore), peerScores(s, pids), "Unexpected scores")
assert.DeepEqual(t, pack(s, startScore, startScore, startScore), peerScores(s, pids))
})
t.Run("block providers score", func(t *testing.T) {
s, pids := setupScorer()
s1 := s.BlockProviderScorer()
zeroScore := s.BlockProviderScorer().MaxScore()
batchWeight := s1.Params().ProcessedBatchWeight
startScore := s.BlockProviderScorer().MaxScore() / 2
batchWeight := s1.Params().ProcessedBatchWeight / 2
// Partial batch.
s1.IncrementProcessedBlocks("peer1", batchSize/4)
@@ -164,11 +165,11 @@ func TestScorers_Service_Score(t *testing.T) {
// Single batch.
s1.IncrementProcessedBlocks("peer1", batchSize)
assert.DeepEqual(t, pack(s, batchWeight, zeroScore, zeroScore), peerScores(s, pids), "Unexpected scores")
assert.DeepEqual(t, pack(s, batchWeight, startScore, startScore), peerScores(s, pids), "Unexpected scores")
// Multiple batches.
s1.IncrementProcessedBlocks("peer2", batchSize*4)
assert.DeepEqual(t, pack(s, batchWeight, batchWeight*4, zeroScore), peerScores(s, pids), "Unexpected scores")
assert.DeepEqual(t, pack(s, batchWeight, batchWeight*4, startScore), peerScores(s, pids), "Unexpected scores")
// Partial batch.
s1.IncrementProcessedBlocks("peer3", batchSize/2)
@@ -187,25 +188,22 @@ func TestScorers_Service_Score(t *testing.T) {
})
t.Run("overall score", func(t *testing.T) {
// Full score, no penalty.
s, _ := setupScorer()
s1 := s.BlockProviderScorer()
s2 := s.BadResponsesScorer()
batchWeight := s1.Params().ProcessedBatchWeight
batchWeight := s1.Params().ProcessedBatchWeight / float64(s.ActiveScorersCount())
penalty := (-1 / float64(s.BadResponsesScorer().Params().Threshold)) / float64(s.ActiveScorersCount())
// Full score, no penalty.
s1.IncrementProcessedBlocks("peer1", batchSize*5)
assert.Equal(t, roundScore(batchWeight*5), s1.Score("peer1"))
assert.Equal(t, roundScore(batchWeight*5), s.Score("peer1"))
// Now, adjust score by introducing penalty for bad responses.
s2.Increment("peer1")
s2.Increment("peer1")
assert.Equal(t, -0.4, s2.Score("peer1"), "Unexpected bad responses score")
assert.Equal(t, roundScore(batchWeight*5), s1.Score("peer1"), "Unexpected block provider score")
assert.Equal(t, roundScore(batchWeight*5-0.4), s.Score("peer1"), "Unexpected overall score")
assert.Equal(t, roundScore(batchWeight*5+2*penalty), s.Score("peer1"), "Unexpected overall score")
// If peer continues to misbehave, score becomes negative.
s2.Increment("peer1")
assert.Equal(t, -0.6, s2.Score("peer1"), "Unexpected bad responses score")
assert.Equal(t, roundScore(batchWeight*5), s1.Score("peer1"), "Unexpected block provider score")
assert.Equal(t, roundScore(batchWeight*5-0.6), s.Score("peer1"), "Unexpected overall score")
assert.Equal(t, roundScore(batchWeight*5+3*penalty), s.Score("peer1"), "Unexpected overall score")
})
}
@@ -218,7 +216,6 @@ func TestScorers_Service_loop(t *testing.T) {
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 5,
Weight: -0.5,
DecayInterval: 50 * time.Millisecond,
},
BlockProviderScorerConfig: &scorers.BlockProviderScorerConfig{
@@ -264,3 +261,45 @@ func TestScorers_Service_loop(t *testing.T) {
assert.Equal(t, false, s1.IsBadPeer(pid1), "Peer should not be marked as bad")
assert.Equal(t, uint64(0), s2.ProcessedBlocks("peer1"), "No blocks are expected")
}
func TestScorers_Service_IsBadPeer(t *testing.T) {
peerStatuses := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 2,
DecayInterval: 50 * time.Second,
},
},
})
assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer1"))
peerStatuses.Scorers().BadResponsesScorer().Increment("peer1")
peerStatuses.Scorers().BadResponsesScorer().Increment("peer1")
assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer("peer1"))
}
func TestScorers_Service_BadPeers(t *testing.T) {
peerStatuses := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 2,
DecayInterval: 50 * time.Second,
},
},
})
assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer1"))
assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer2"))
assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer3"))
assert.Equal(t, 0, len(peerStatuses.Scorers().BadPeers()))
for _, pid := range []peer.ID{"peer1", "peer3"} {
peerStatuses.Scorers().BadResponsesScorer().Increment(pid)
peerStatuses.Scorers().BadResponsesScorer().Increment(pid)
}
assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer("peer1"))
assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer2"))
assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer("peer3"))
assert.Equal(t, 2, len(peerStatuses.Scorers().BadPeers()))
}

View File

@@ -280,7 +280,7 @@ func (p *Status) ChainStateLastUpdated(pid peer.ID) (time.Time, error) {
// IsBad states if the peer is to be considered bad.
// If the peer is unknown this will return `false`, which makes using this function easier than returning an error.
func (p *Status) IsBad(pid peer.ID) bool {
return p.scorers.BadResponsesScorer().IsBadPeer(pid)
return p.scorers.IsBadPeer(pid)
}
// NextValidTime gets the earliest possible time it is to contact/dial

View File

@@ -179,7 +179,6 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
Weight: -100,
DecayInterval: time.Hour,
},
},