Adds block provider scorer (#6756)

* prepares peer scorer collection

* decouples scoring service into separate object

* updates references

* renames package

* removes redundant test init

* gazelle

* gofmt

* updates comment

* fix build

* adds block provider scorer

* score rounding factor constant (per Nishant's suggestion)

* updates penalty applying

* updates score block provider tests

* updates scorer tests

* expand test suite

* get rid of penalties + counters for requested/returned blocks

* removes start score

* fixes provider test

* fixes scorer manager tests

* updates comments

* moves roundScore test function

* maxscore tests

* improves test coverage

* Update beacon-chain/p2p/peers/score_block_providers.go

Co-authored-by: Nishant Das <nishdas93@gmail.com>

* renames var to make it less ambigous - per Nishant's suggestion

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: Nishant Das <nishdas93@gmail.com>
This commit is contained in:
Victor Farazdagi
2020-08-01 12:20:05 +03:00
committed by GitHub
parent 0e1fb65476
commit bf59530d93
7 changed files with 694 additions and 26 deletions

View File

@@ -5,6 +5,7 @@ go_library(
name = "go_default_library",
srcs = [
"score_bad_responses.go",
"score_block_providers.go",
"scorer_manager.go",
"status.go",
"store.go",
@@ -13,6 +14,7 @@ go_library(
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/flags:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/roughtime:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
@@ -29,13 +31,17 @@ go_test(
name = "go_default_test",
srcs = [
"benchmark_test.go",
"peers_test.go",
"score_bad_responses_test.go",
"score_block_providers_test.go",
"scorer_manager_test.go",
"status_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/flags:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/params:go_default_library",
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
@@ -44,5 +50,6 @@ go_test(
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -0,0 +1,36 @@
package peers_test
import (
"io/ioutil"
"math"
"os"
"testing"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/sirupsen/logrus"
)
func TestMain(m *testing.M) {
logrus.SetLevel(logrus.DebugLevel)
logrus.SetOutput(ioutil.Discard)
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{})
defer resetCfg()
resetFlags := flags.Get()
flags.Init(&flags.GlobalFlags{
BlockBatchLimit: 64,
BlockBatchLimitBurstFactor: 10,
})
defer func() {
flags.Init(resetFlags)
}()
os.Exit(m.Run())
}
// roundScore returns score rounded in accordance with the score manager's rounding factor.
func roundScore(score float64) float64 {
return math.Round(score*peers.ScoreRoundingFactor) / peers.ScoreRoundingFactor
}

View File

@@ -0,0 +1,179 @@
package peers
import (
"context"
"fmt"
"math"
"sort"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
)
const (
// DefaultBlockProviderProcessedBatchWeight is a default reward weight of a processed batch of blocks.
DefaultBlockProviderProcessedBatchWeight = 0.05
// DefaultBlockProviderDecayInterval defines how often the decaying routine is called.
DefaultBlockProviderDecayInterval = 1 * time.Minute
)
// BlockProviderScorer represents block provider scoring service.
type BlockProviderScorer struct {
ctx context.Context
config *BlockProviderScorerConfig
store *peerDataStore
// highestProcessedBlocksCount defines maximum number of processed blocks attained by some peer during
// the lifetime of a service. This is used to gauge relative performance of other peers.
highestProcessedBlocksCount uint64
}
// BlockProviderScorerConfig holds configuration parameters for block providers scoring service.
type BlockProviderScorerConfig struct {
// ProcessedBatchWeight defines a reward for a single processed batch of blocks.
ProcessedBatchWeight float64
// DecayInterval defines how often stats should be decayed.
DecayInterval time.Duration
// Decay specifies number of blocks subtracted from stats on each decay step.
Decay uint64
}
// newBlockProviderScorer creates block provider scoring service.
func newBlockProviderScorer(
ctx context.Context, store *peerDataStore, config *BlockProviderScorerConfig) *BlockProviderScorer {
if config == nil {
config = &BlockProviderScorerConfig{}
}
scorer := &BlockProviderScorer{
ctx: ctx,
config: config,
store: store,
}
if scorer.config.ProcessedBatchWeight == 0.0 {
scorer.config.ProcessedBatchWeight = DefaultBlockProviderProcessedBatchWeight
}
if scorer.config.DecayInterval == 0 {
scorer.config.DecayInterval = DefaultBlockProviderDecayInterval
}
batchSize := uint64(flags.Get().BlockBatchLimit)
scorer.highestProcessedBlocksCount = batchSize
if scorer.config.Decay == 0 {
scorer.config.Decay = batchSize
}
return scorer
}
// Score calculates and returns block provider score.
func (s *BlockProviderScorer) Score(pid peer.ID) float64 {
s.store.RLock()
defer s.store.RUnlock()
return s.score(pid)
}
// score is a lock-free version of Score.
func (s *BlockProviderScorer) score(pid peer.ID) float64 {
score := float64(0)
peerData, ok := s.store.peers[pid]
if !ok {
return score
}
batchSize := uint64(flags.Get().BlockBatchLimit)
if batchSize > 0 {
processedBatches := float64(peerData.processedBlocks / batchSize)
score += processedBatches * s.config.ProcessedBatchWeight
}
return math.Round(score*ScoreRoundingFactor) / ScoreRoundingFactor
}
// Params exposes scorer's parameters.
func (s *BlockProviderScorer) Params() *BlockProviderScorerConfig {
return s.config
}
// IncrementProcessedBlocks increments the number of blocks that have been successfully processed.
func (s *BlockProviderScorer) IncrementProcessedBlocks(pid peer.ID, cnt uint64) {
s.store.Lock()
defer s.store.Unlock()
if cnt <= 0 {
return
}
if _, ok := s.store.peers[pid]; !ok {
s.store.peers[pid] = &peerData{}
}
s.store.peers[pid].processedBlocks += cnt
if s.store.peers[pid].processedBlocks > s.highestProcessedBlocksCount {
s.highestProcessedBlocksCount = s.store.peers[pid].processedBlocks
}
}
// ProcessedBlocks returns number of peer returned blocks that are successfully processed.
func (s *BlockProviderScorer) ProcessedBlocks(pid peer.ID) uint64 {
s.store.RLock()
defer s.store.RUnlock()
return s.processedBlocks(pid)
}
// processedBlocks is a lock-free version of ProcessedBlocks.
func (s *BlockProviderScorer) processedBlocks(pid peer.ID) uint64 {
if peerData, ok := s.store.peers[pid]; ok {
return peerData.processedBlocks
}
return 0
}
// Decay updates block provider counters by decaying them.
// This urges peers to keep up the performance to continue getting a high score (and allows
// new peers to contest previously high scoring ones).
func (s *BlockProviderScorer) Decay() {
s.store.Lock()
defer s.store.Unlock()
for _, peerData := range s.store.peers {
if peerData.processedBlocks > s.config.Decay {
peerData.processedBlocks -= s.config.Decay
} else {
peerData.processedBlocks = 0
}
}
}
// Sorted returns list of block providers sorted by score in descending order.
func (s *BlockProviderScorer) Sorted(pids []peer.ID) []peer.ID {
s.store.Lock()
defer s.store.Unlock()
if len(pids) == 0 {
return pids
}
scores := make(map[peer.ID]float64, len(pids))
peers := make([]peer.ID, len(pids))
for i, pid := range pids {
scores[pid] = s.score(pid)
peers[i] = pid
}
sort.Slice(peers, func(i, j int) bool {
return scores[peers[i]] > scores[peers[j]]
})
return peers
}
// FormatScorePretty returns full scoring information in a human-readable format.
func (s *BlockProviderScorer) FormatScorePretty(pid peer.ID) string {
s.store.RLock()
defer s.store.RUnlock()
score := s.score(pid)
return fmt.Sprintf("[%0.1f%%, raw: %v, blocks: %d/%d]",
(score/s.MaxScore())*100, score, s.processedBlocks(pid), s.highestProcessedBlocksCount)
}
// MaxScore exposes maximum score attainable by peers.
func (s *BlockProviderScorer) MaxScore() float64 {
score := s.Params().ProcessedBatchWeight
batchSize := uint64(flags.Get().BlockBatchLimit)
if batchSize > 0 {
totalProcessedBatches := float64(s.highestProcessedBlocksCount / batchSize)
score = totalProcessedBatches * s.config.ProcessedBatchWeight
}
return math.Round(score*ScoreRoundingFactor) / ScoreRoundingFactor
}

View File

@@ -0,0 +1,337 @@
package peers_test
import (
"context"
"fmt"
"testing"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
)
func TestPeerScorer_BlockProvider_Score(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.05,
},
},
})
scorer := peerStatuses.Scorers().BlockProviderScorer()
batchSize := uint64(flags.Get().BlockBatchLimit)
// Start with non-exitent provider.
assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score for unregistered provider")
// Increment to zero (provider is added to cache but score is unchanged).
scorer.IncrementProcessedBlocks("peer1", 0)
assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score for registered provider")
// Partial score (less than a single batch of blocks processed).
scorer.IncrementProcessedBlocks("peer1", batchSize/2)
assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score")
// Single batch.
scorer.IncrementProcessedBlocks("peer1", batchSize)
assert.Equal(t, roundScore(0.05), scorer.Score("peer1"), "Unexpected score")
// Multiple batches.
scorer.IncrementProcessedBlocks("peer2", batchSize*13)
assert.Equal(t, roundScore(0.05*13), scorer.Score("peer2"), "Unexpected score")
}
func TestPeerScorer_BlockProvider_GettersSetters(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &peers.PeerScorerConfig{},
})
scorer := peerStatuses.Scorers().BlockProviderScorer()
assert.Equal(t, uint64(0), scorer.ProcessedBlocks("peer1"), "Unexpected count for unregistered peer")
scorer.IncrementProcessedBlocks("peer1", 64)
assert.Equal(t, uint64(64), scorer.ProcessedBlocks("peer1"))
}
func TestPeerScorer_BlockProvider_Sorted(t *testing.T) {
batchSize := uint64(flags.Get().BlockBatchLimit)
tests := []struct {
name string
update func(s *peers.BlockProviderScorer)
have []peer.ID
want []peer.ID
}{
{
name: "no peers",
update: func(s *peers.BlockProviderScorer) {},
have: []peer.ID{},
want: []peer.ID{},
},
{
name: "same scores",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", 16)
s.IncrementProcessedBlocks("peer2", 16)
s.IncrementProcessedBlocks("peer3", 16)
},
have: []peer.ID{"peer1", "peer2", "peer3"},
want: []peer.ID{"peer1", "peer2", "peer3"},
},
{
name: "same scores multiple batches",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*7+16)
s.IncrementProcessedBlocks("peer2", batchSize*7+16)
s.IncrementProcessedBlocks("peer3", batchSize*7+16)
},
have: []peer.ID{"peer1", "peer2", "peer3"},
want: []peer.ID{"peer1", "peer2", "peer3"},
},
{
name: "same scores multiple batches unequal blocks",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*7+6)
s.IncrementProcessedBlocks("peer2", batchSize*7+16)
s.IncrementProcessedBlocks("peer3", batchSize*7+26)
},
have: []peer.ID{"peer1", "peer2", "peer3"},
want: []peer.ID{"peer1", "peer2", "peer3"},
},
{
name: "different scores",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*3)
s.IncrementProcessedBlocks("peer2", batchSize*1)
s.IncrementProcessedBlocks("peer3", batchSize*2)
},
have: []peer.ID{"peer3", "peer2", "peer1"},
want: []peer.ID{"peer1", "peer3", "peer2"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &peers.PeerScorerConfig{
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.2,
},
},
})
scorer := peerStatuses.Scorers().BlockProviderScorer()
tt.update(scorer)
assert.DeepEqual(t, tt.want, scorer.Sorted(tt.have))
})
}
}
func TestPeerScorer_BlockProvider_MaxScore(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
batchSize := uint64(flags.Get().BlockBatchLimit)
tests := []struct {
name string
update func(s *peers.BlockProviderScorer)
want float64
}{
{
// Minimal max.score is a reward for a single batch.
name: "no peers",
update: nil,
want: 0.2,
},
{
name: "partial batch",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize/4)
},
want: 0.2,
},
{
name: "single batch",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize)
},
want: 0.2,
},
{
name: "3/2 of a batch",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*3/2)
},
want: 0.2,
},
{
name: "multiple batches",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*5)
},
want: 0.2 * 5,
},
{
name: "multiple peers",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*5)
s.IncrementProcessedBlocks("peer1", batchSize)
s.IncrementProcessedBlocks("peer2", batchSize*10)
s.IncrementProcessedBlocks("peer1", batchSize/4)
},
want: 0.2 * 10,
},
{
// Even after stats is decayed, max. attained blocks count must remain
// (as a ballpark of overall performance of peers during life-cycle of service).
name: "decaying",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*5)
s.IncrementProcessedBlocks("peer1", batchSize)
s.IncrementProcessedBlocks("peer2", batchSize*10)
s.IncrementProcessedBlocks("peer1", batchSize/4)
for i := 0; i < 10; i++ {
s.Decay()
}
assert.Equal(t, uint64(0), s.ProcessedBlocks("peer1"))
assert.Equal(t, uint64(0), s.ProcessedBlocks("peer2"))
},
want: 0.2 * 10,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &peers.PeerScorerConfig{
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.2,
},
},
})
scorer := peerStatuses.Scorers().BlockProviderScorer()
if tt.update != nil {
tt.update(scorer)
}
assert.Equal(t, tt.want, scorer.MaxScore())
})
}
}
func TestPeerScorer_BlockProvider_FormatScorePretty(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
batchSize := uint64(flags.Get().BlockBatchLimit)
prettyFormat := "[%0.1f%%, raw: %v, blocks: %d/%d]"
tests := []struct {
name string
update func(s *peers.BlockProviderScorer)
want string
}{
{
// Minimal max.score is a reward for a single batch.
name: "no peers",
update: nil,
want: fmt.Sprintf(prettyFormat, 0.0, 0, 0, batchSize),
},
{
name: "partial batch",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize/4)
},
want: fmt.Sprintf(prettyFormat, 0.0, 0, batchSize/4, batchSize),
},
{
name: "single batch",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize)
// There was some other peer who increased maximally attainable number of blocks.
s.IncrementProcessedBlocks("peer2", batchSize*5)
},
want: fmt.Sprintf(prettyFormat, 20.0, 0.05, batchSize, batchSize*5),
},
{
name: "single batch max score",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize)
},
want: fmt.Sprintf(prettyFormat, 100.0, 0.05, batchSize, batchSize),
},
{
name: "3/2 of a batch",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*3/2)
s.IncrementProcessedBlocks("peer2", batchSize*5)
},
want: fmt.Sprintf(prettyFormat, 20.0, 0.05, batchSize*3/2, batchSize*5),
},
{
name: "3/2 of a batch max score",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*3/2)
},
want: fmt.Sprintf(prettyFormat, 100.0, 0.05, batchSize*3/2, batchSize*3/2),
},
{
name: "multiple batches",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*5)
s.IncrementProcessedBlocks("peer2", batchSize*10)
},
want: fmt.Sprintf(prettyFormat, 50.0, 0.05*5, batchSize*5, batchSize*10),
},
{
name: "multiple batches max score",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*5)
},
want: fmt.Sprintf(prettyFormat, 100.0, 0.05*5, batchSize*5, batchSize*5),
},
{
name: "multiple peers",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*5)
s.IncrementProcessedBlocks("peer1", batchSize)
s.IncrementProcessedBlocks("peer2", batchSize*10)
s.IncrementProcessedBlocks("peer1", batchSize/4)
},
want: fmt.Sprintf(prettyFormat, 60.0, 0.05*6, batchSize*6+batchSize/4, batchSize*10),
},
{
name: "decaying",
update: func(s *peers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*5)
s.IncrementProcessedBlocks("peer1", batchSize)
s.IncrementProcessedBlocks("peer2", batchSize*10)
s.IncrementProcessedBlocks("peer1", batchSize/4)
expected := fmt.Sprintf(prettyFormat, 60.0, 0.05*6, batchSize*6+batchSize/4, batchSize*10)
assert.Equal(t, expected, s.FormatScorePretty("peer1"))
s.Decay()
},
want: fmt.Sprintf(prettyFormat, 50.0, 0.05*5, batchSize*5+batchSize/4, batchSize*10),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &peers.PeerScorerConfig{
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.05,
},
},
})
scorer := peerStatuses.Scorers().BlockProviderScorer()
if tt.update != nil {
tt.update(scorer)
}
assert.Equal(t, tt.want, scorer.FormatScorePretty("peer1"))
})
}
}

View File

@@ -8,18 +8,24 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)
// ScoreRoundingFactor defines how many digits to keep in decimal part.
// This parameter is used in math.Round(score*ScoreRoundingFactor) / ScoreRoundingFactor.
const ScoreRoundingFactor = 10000
// PeerScorerManager keeps track of peer scorers that are used to calculate overall peer score.
type PeerScorerManager struct {
ctx context.Context
store *peerDataStore
scorers struct {
badResponsesScorer *BadResponsesScorer
badResponsesScorer *BadResponsesScorer
blockProviderScorer *BlockProviderScorer
}
}
// PeerScorerConfig holds configuration parameters for scoring service.
type PeerScorerConfig struct {
BadResponsesScorerConfig *BadResponsesScorerConfig
BadResponsesScorerConfig *BadResponsesScorerConfig
BlockProviderScorerConfig *BlockProviderScorerConfig
}
// newPeerScorerManager provides fully initialized peer scoring service.
@@ -28,8 +34,8 @@ func newPeerScorerManager(ctx context.Context, store *peerDataStore, config *Pee
ctx: ctx,
store: store,
}
mgr.scorers.badResponsesScorer = newBadResponsesScorer(ctx, store, config.BadResponsesScorerConfig)
mgr.scorers.blockProviderScorer = newBlockProviderScorer(ctx, store, config.BlockProviderScorerConfig)
go mgr.loop(mgr.ctx)
return mgr
@@ -40,6 +46,11 @@ func (m *PeerScorerManager) BadResponsesScorer() *BadResponsesScorer {
return m.scorers.badResponsesScorer
}
// BlockProviderScorer exposes block provider scoring service.
func (m *PeerScorerManager) BlockProviderScorer() *BlockProviderScorer {
return m.scorers.blockProviderScorer
}
// Score returns calculated peer score across all tracked metrics.
func (m *PeerScorerManager) Score(pid peer.ID) float64 {
m.store.RLock()
@@ -50,18 +61,23 @@ func (m *PeerScorerManager) Score(pid peer.ID) float64 {
return 0
}
score += m.scorers.badResponsesScorer.score(pid)
return math.Round(score*10000) / 10000
score += m.scorers.blockProviderScorer.score(pid)
return math.Round(score*ScoreRoundingFactor) / ScoreRoundingFactor
}
// loop handles background tasks.
func (m *PeerScorerManager) loop(ctx context.Context) {
decayBadResponsesStats := time.NewTicker(m.scorers.badResponsesScorer.Params().DecayInterval)
defer decayBadResponsesStats.Stop()
decayBlockProviderStats := time.NewTicker(m.scorers.blockProviderScorer.Params().DecayInterval)
defer decayBlockProviderStats.Stop()
for {
select {
case <-decayBadResponsesStats.C:
m.scorers.badResponsesScorer.Decay()
case <-decayBlockProviderStats.C:
m.scorers.blockProviderScorer.Decay()
case <-ctx.Done():
return
}

View File

@@ -2,17 +2,17 @@ package peers_test
import (
"context"
"math"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
)
func TestPeerScorer_NewPeerScorerManager(t *testing.T) {
func TestPeerScorer_PeerScorerManager_Init(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -21,11 +21,19 @@ func TestPeerScorer_NewPeerScorerManager(t *testing.T) {
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{},
})
// Bad responses stats.
params := peerStatuses.Scorers().BadResponsesScorer().Params()
assert.Equal(t, peers.DefaultBadResponsesThreshold, params.Threshold, "Unexpected threshold value")
assert.Equal(t, peers.DefaultBadResponsesWeight, params.Weight, "Unexpected weight value")
assert.Equal(t, peers.DefaultBadResponsesDecayInterval, params.DecayInterval, "Unexpected decay interval value")
t.Run("bad responses scorer", func(t *testing.T) {
params := peerStatuses.Scorers().BadResponsesScorer().Params()
assert.Equal(t, peers.DefaultBadResponsesThreshold, params.Threshold, "Unexpected threshold value")
assert.Equal(t, peers.DefaultBadResponsesWeight, params.Weight, "Unexpected weight value")
assert.Equal(t, peers.DefaultBadResponsesDecayInterval, params.DecayInterval, "Unexpected decay interval value")
})
t.Run("block providers scorer", func(t *testing.T) {
params := peerStatuses.Scorers().BlockProviderScorer().Params()
assert.Equal(t, peers.DefaultBlockProviderProcessedBatchWeight, params.ProcessedBatchWeight)
assert.Equal(t, peers.DefaultBlockProviderDecayInterval, params.DecayInterval)
})
})
t.Run("explicit config", func(t *testing.T) {
@@ -37,13 +45,27 @@ func TestPeerScorer_NewPeerScorerManager(t *testing.T) {
Weight: -1,
DecayInterval: 1 * time.Minute,
},
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.6,
DecayInterval: 1 * time.Minute,
Decay: 16,
},
},
})
// Bad responses stats.
params := peerStatuses.Scorers().BadResponsesScorer().Params()
assert.Equal(t, 2, params.Threshold, "Unexpected threshold value")
assert.Equal(t, -1.0, params.Weight, "Unexpected weight value")
assert.Equal(t, 1*time.Minute, params.DecayInterval, "Unexpected decay interval value")
t.Run("bad responses scorer", func(t *testing.T) {
params := peerStatuses.Scorers().BadResponsesScorer().Params()
assert.Equal(t, 2, params.Threshold, "Unexpected threshold value")
assert.Equal(t, -1.0, params.Weight, "Unexpected weight value")
assert.Equal(t, 1*time.Minute, params.DecayInterval, "Unexpected decay interval value")
})
t.Run("block provider scorer", func(t *testing.T) {
params := peerStatuses.Scorers().BlockProviderScorer().Params()
assert.Equal(t, 0.6, params.ProcessedBatchWeight)
assert.Equal(t, 1*time.Minute, params.DecayInterval)
assert.Equal(t, uint64(16), params.Decay)
})
})
}
@@ -51,6 +73,8 @@ func TestPeerScorer_PeerScorerManager_Score(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
batchSize := uint64(flags.Get().BlockBatchLimit)
peerScores := func(s *peers.PeerScorerManager, pids []peer.ID) map[string]float64 {
scores := make(map[string]float64, len(pids))
for _, pid := range pids {
@@ -61,9 +85,9 @@ func TestPeerScorer_PeerScorerManager_Score(t *testing.T) {
pack := func(scorer *peers.PeerScorerManager, s1, s2, s3 float64) map[string]float64 {
return map[string]float64{
"peer1": math.Round(s1*10000) / 10000,
"peer2": math.Round(s2*10000) / 10000,
"peer3": math.Round(s3*10000) / 10000,
"peer1": roundScore(s1),
"peer2": roundScore(s2),
"peer3": roundScore(s3),
}
}
@@ -74,13 +98,18 @@ func TestPeerScorer_PeerScorerManager_Score(t *testing.T) {
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
Threshold: 5,
},
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.05,
Decay: 64,
},
},
})
s := peerStatuses.Scorers()
pids := []peer.ID{"peer1", "peer2", "peer3"}
for _, pid := range pids {
peerStatuses.Add(nil, pid, nil, network.DirUnknown)
assert.Equal(t, 0.0, s.Score(pid), "Unexpected score")
// Not yet used peer gets boosted score.
assert.Equal(t, 0.0, s.Score(pid), "Unexpected score for not yet used peer")
}
return s, pids
}
@@ -91,6 +120,7 @@ func TestPeerScorer_PeerScorerManager_Score(t *testing.T) {
})
s := peerStatuses.Scorers()
assert.Equal(t, 0.0, s.BadResponsesScorer().Score("peer1"))
assert.Equal(t, 0.0, s.BlockProviderScorer().Score("peer1"))
assert.Equal(t, 0.0, s.Score("peer1"))
})
@@ -110,6 +140,59 @@ func TestPeerScorer_PeerScorerManager_Score(t *testing.T) {
s.BadResponsesScorer().Decay()
assert.DeepEqual(t, pack(s, 0, 0, 0), peerScores(s, pids), "Unexpected scores")
})
t.Run("block providers score", func(t *testing.T) {
s, pids := setupScorer()
s1 := s.BlockProviderScorer()
// Partial batch.
s1.IncrementProcessedBlocks("peer1", batchSize/4)
assert.Equal(t, 0.0, s.Score("peer1"), "Unexpected %q score", "peer1")
// Single batch.
s1.IncrementProcessedBlocks("peer1", batchSize)
assert.DeepEqual(t, pack(s, 0.05, 0, 0), peerScores(s, pids), "Unexpected scores")
// Multiple batches.
s1.IncrementProcessedBlocks("peer2", batchSize*4)
assert.DeepEqual(t, pack(s, 0.05, 0.05*4, 0), peerScores(s, pids), "Unexpected scores")
// Partial batch.
s1.IncrementProcessedBlocks("peer3", batchSize/2)
assert.DeepEqual(t, pack(s, 0.05, 0.05*4, 0), peerScores(s, pids), "Unexpected scores")
// See effect of decaying.
assert.Equal(t, batchSize+batchSize/4, s1.ProcessedBlocks("peer1"))
assert.Equal(t, batchSize*4, s1.ProcessedBlocks("peer2"))
assert.Equal(t, batchSize/2, s1.ProcessedBlocks("peer3"))
assert.DeepEqual(t, pack(s, 0.05, 0.05*4, 0), peerScores(s, pids), "Unexpected scores")
s1.Decay()
assert.Equal(t, batchSize/4, s1.ProcessedBlocks("peer1"))
assert.Equal(t, batchSize*3, s1.ProcessedBlocks("peer2"))
assert.Equal(t, uint64(0), s1.ProcessedBlocks("peer3"))
assert.DeepEqual(t, pack(s, 0, 0.05*3, 0), peerScores(s, pids), "Unexpected scores")
})
t.Run("overall score", func(t *testing.T) {
// Full score, no penalty.
s, _ := setupScorer()
s1 := s.BlockProviderScorer()
s2 := s.BadResponsesScorer()
s1.IncrementProcessedBlocks("peer1", batchSize*10)
assert.Equal(t, roundScore(0.05*10), s1.Score("peer1"))
// Now, adjust score by introducing penalty for bad responses.
s2.Increment("peer1")
s2.Increment("peer1")
assert.Equal(t, -0.4, s2.Score("peer1"), "Unexpected bad responses score")
assert.Equal(t, roundScore(0.05*10), s1.Score("peer1"), "Unexpected block provider score")
assert.Equal(t, roundScore(0.05*10-0.4), s.Score("peer1"), "Unexpected overall score")
// If peer continues to misbehave, score becomes negative.
s2.Increment("peer1")
assert.Equal(t, -0.6, s2.Score("peer1"), "Unexpected bad responses score")
assert.Equal(t, roundScore(0.05*10), s1.Score("peer1"), "Unexpected block provider score")
assert.Equal(t, -0.1, s.Score("peer1"), "Unexpected overall score")
})
}
func TestPeerScorer_PeerScorerManager_loop(t *testing.T) {
@@ -124,16 +207,24 @@ func TestPeerScorer_PeerScorerManager_loop(t *testing.T) {
Weight: -0.5,
DecayInterval: 50 * time.Millisecond,
},
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{
DecayInterval: 25 * time.Millisecond,
Decay: 64,
},
},
})
scorer := peerStatuses.Scorers().BadResponsesScorer()
s1 := peerStatuses.Scorers().BadResponsesScorer()
s2 := peerStatuses.Scorers().BlockProviderScorer()
pid1 := peer.ID("peer1")
peerStatuses.Add(nil, pid1, nil, network.DirUnknown)
for i := 0; i < scorer.Params().Threshold+5; i++ {
scorer.Increment(pid1)
for i := 0; i < s1.Params().Threshold+5; i++ {
s1.Increment(pid1)
}
assert.Equal(t, true, scorer.IsBadPeer(pid1), "Peer should be marked as bad")
assert.Equal(t, true, s1.IsBadPeer(pid1), "Peer should be marked as bad")
s2.IncrementProcessedBlocks("peer1", 221)
assert.Equal(t, uint64(221), s2.ProcessedBlocks("peer1"))
done := make(chan struct{}, 1)
go func() {
@@ -144,7 +235,7 @@ func TestPeerScorer_PeerScorerManager_loop(t *testing.T) {
for {
select {
case <-ticker.C:
if scorer.IsBadPeer(pid1) == false {
if s1.IsBadPeer(pid1) == false && s2.ProcessedBlocks("peer1") == 0 {
return
}
case <-ctx.Done():
@@ -155,5 +246,6 @@ func TestPeerScorer_PeerScorerManager_loop(t *testing.T) {
}()
<-done
assert.Equal(t, false, scorer.IsBadPeer(pid1), "Peer should not be marked as bad")
assert.Equal(t, false, s1.IsBadPeer(pid1), "Peer should not be marked as bad")
assert.Equal(t, uint64(0), s2.ProcessedBlocks("peer1"), "No blocks are expected")
}

View File

@@ -37,6 +37,7 @@ type peerData struct {
metaData *pb.MetaData
chainStateLastUpdated time.Time
badResponses int
processedBlocks uint64
}
// newPeerDataStore creates peer store.