From b241fafd13ea364380a7e54213112e51af0c8b52 Mon Sep 17 00:00:00 2001 From: Victor Farazdagi Date: Sun, 9 Aug 2020 23:26:09 +0300 Subject: [PATCH] Peer scorer: weighted sorting, requests capping, stale peers handling. (#6928) * cherry pick * fixes test * go tidy * Merge refs/heads/master into peer-scorer-weighted-sorter * Merge refs/heads/master into peer-scorer-weighted-sorter * Merge refs/heads/master into peer-scorer-weighted-sorter * Merge refs/heads/master into peer-scorer-weighted-sorter * Merge refs/heads/master into peer-scorer-weighted-sorter * Merge refs/heads/master into peer-scorer-weighted-sorter * Merge refs/heads/master into peer-scorer-weighted-sorter * Merge refs/heads/master into peer-scorer-weighted-sorter * Merge refs/heads/master into peer-scorer-weighted-sorter --- beacon-chain/p2p/peers/BUILD.bazel | 3 + .../p2p/peers/score_block_providers.go | 176 ++++++-- .../p2p/peers/score_block_providers_test.go | 382 ++++++++++++------ beacon-chain/p2p/peers/scorer_manager_test.go | 36 +- beacon-chain/p2p/peers/store.go | 1 + 5 files changed, 422 insertions(+), 176 deletions(-) diff --git a/beacon-chain/p2p/peers/BUILD.bazel b/beacon-chain/p2p/peers/BUILD.bazel index a751a59d11..459cfbd41f 100644 --- a/beacon-chain/p2p/peers/BUILD.bazel +++ b/beacon-chain/p2p/peers/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/flags:go_default_library", "//proto/beacon/p2p/v1:go_default_library", + "//shared/rand:go_default_library", "//shared/roughtime:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", @@ -43,6 +44,8 @@ go_test( "//proto/beacon/p2p/v1:go_default_library", "//shared/featureconfig:go_default_library", "//shared/params:go_default_library", + "//shared/rand:go_default_library", + "//shared/roughtime:go_default_library", "//shared/testutil/assert:go_default_library", "//shared/testutil/require:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", diff --git a/beacon-chain/p2p/peers/score_block_providers.go b/beacon-chain/p2p/peers/score_block_providers.go index 686946b747..8ef6dd2d0b 100644 --- a/beacon-chain/p2p/peers/score_block_providers.go +++ b/beacon-chain/p2p/peers/score_block_providers.go @@ -9,13 +9,26 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/prysmaticlabs/prysm/beacon-chain/flags" + "github.com/prysmaticlabs/prysm/shared/rand" + "github.com/prysmaticlabs/prysm/shared/roughtime" ) const ( // DefaultBlockProviderProcessedBatchWeight is a default reward weight of a processed batch of blocks. - DefaultBlockProviderProcessedBatchWeight = 0.05 + DefaultBlockProviderProcessedBatchWeight = float64(0.05) + // DefaultBlockProviderProcessedBlocksCap defines default value for processed blocks cap. + // e.g. 20 * 64 := 20 batches of size 64 (with 0.05 per batch reward, 20 batches result in score of 1.0). + DefaultBlockProviderProcessedBlocksCap = uint64(20 * 64) // DefaultBlockProviderDecayInterval defines how often the decaying routine is called. - DefaultBlockProviderDecayInterval = 1 * time.Minute + DefaultBlockProviderDecayInterval = 30 * time.Second + // DefaultBlockProviderDecay defines default blocks that are to be subtracted from stats on each + // decay interval. Effectively, this param provides minimum expected performance for a peer to remain + // high scorer. + DefaultBlockProviderDecay = uint64(5 * 64) + // DefaultBlockProviderStalePeerRefreshInterval defines default interval at which peers should be given + // opportunity to provide blocks (their score gets boosted, up until they are selected for + // fetching). + DefaultBlockProviderStalePeerRefreshInterval = 1 * time.Minute ) // BlockProviderScorer represents block provider scoring service. @@ -23,19 +36,27 @@ type BlockProviderScorer struct { ctx context.Context config *BlockProviderScorerConfig store *peerDataStore - // highestProcessedBlocksCount defines maximum number of processed blocks attained by some peer during - // the lifetime of a service. This is used to gauge relative performance of other peers. - highestProcessedBlocksCount uint64 + // maxScore is a cached value for maximum attainable block provider score. + // It is calculated, on startup, as following: (processedBlocksCap / batchSize) * batchWeight. + maxScore float64 } // BlockProviderScorerConfig holds configuration parameters for block providers scoring service. type BlockProviderScorerConfig struct { // ProcessedBatchWeight defines a reward for a single processed batch of blocks. ProcessedBatchWeight float64 + // ProcessedBlocksCap defines the highest number of processed blocks that are counted towards peer's score. + // Once that cap is attained, peer is considered good to fetch from (and several peers having the + // same score, are picked at random). To stay at max score, peer must continue to perform, as + // stats decays quickly. + ProcessedBlocksCap uint64 // DecayInterval defines how often stats should be decayed. DecayInterval time.Duration // Decay specifies number of blocks subtracted from stats on each decay step. Decay uint64 + // StalePeerRefreshInterval is an interval at which peers should be given an opportunity + // to provide blocks (scores are boosted to max up until such peers are selected). + StalePeerRefreshInterval time.Duration } // newBlockProviderScorer creates block provider scoring service. @@ -55,10 +76,21 @@ func newBlockProviderScorer( if scorer.config.DecayInterval == 0 { scorer.config.DecayInterval = DefaultBlockProviderDecayInterval } - batchSize := uint64(flags.Get().BlockBatchLimit) - scorer.highestProcessedBlocksCount = batchSize + if scorer.config.ProcessedBlocksCap == 0 { + scorer.config.ProcessedBlocksCap = DefaultBlockProviderProcessedBlocksCap + } if scorer.config.Decay == 0 { - scorer.config.Decay = batchSize + scorer.config.Decay = DefaultBlockProviderDecay + } + if scorer.config.StalePeerRefreshInterval == 0 { + scorer.config.StalePeerRefreshInterval = DefaultBlockProviderStalePeerRefreshInterval + } + batchSize := uint64(flags.Get().BlockBatchLimit) + scorer.maxScore = 1.0 + if batchSize > 0 { + totalBatches := float64(scorer.config.ProcessedBlocksCap / batchSize) + scorer.maxScore = totalBatches * scorer.config.ProcessedBatchWeight + scorer.maxScore = math.Round(scorer.maxScore*ScoreRoundingFactor) / ScoreRoundingFactor } return scorer } @@ -74,8 +106,9 @@ func (s *BlockProviderScorer) Score(pid peer.ID) float64 { func (s *BlockProviderScorer) score(pid peer.ID) float64 { score := float64(0) peerData, ok := s.store.peers[pid] - if !ok { - return score + // Boost score of new peers or peers that haven't been accessed for too long. + if !ok || time.Since(peerData.blockProviderUpdated) >= s.config.StalePeerRefreshInterval { + return s.maxScore } batchSize := uint64(flags.Get().BlockBatchLimit) if batchSize > 0 { @@ -94,6 +127,7 @@ func (s *BlockProviderScorer) Params() *BlockProviderScorerConfig { func (s *BlockProviderScorer) IncrementProcessedBlocks(pid peer.ID, cnt uint64) { s.store.Lock() defer s.store.Unlock() + defer s.touch(pid) if cnt <= 0 { return @@ -101,9 +135,31 @@ func (s *BlockProviderScorer) IncrementProcessedBlocks(pid peer.ID, cnt uint64) if _, ok := s.store.peers[pid]; !ok { s.store.peers[pid] = &peerData{} } - s.store.peers[pid].processedBlocks += cnt - if s.store.peers[pid].processedBlocks > s.highestProcessedBlocksCount { - s.highestProcessedBlocksCount = s.store.peers[pid].processedBlocks + if s.store.peers[pid].processedBlocks+cnt > s.config.ProcessedBlocksCap { + cnt = s.config.ProcessedBlocksCap - s.store.peers[pid].processedBlocks + } + if cnt > 0 { + s.store.peers[pid].processedBlocks += cnt + } +} + +// Touch updates last access time for a given peer. This allows to detect peers that are +// stale and boost their scores to increase chances in block fetching participation. +func (s *BlockProviderScorer) Touch(pid peer.ID, t ...time.Time) { + s.store.Lock() + defer s.store.Unlock() + s.touch(pid, t...) +} + +// touch is a lock-free version of Touch. +func (s *BlockProviderScorer) touch(pid peer.ID, t ...time.Time) { + if _, ok := s.store.peers[pid]; !ok { + s.store.peers[pid] = &peerData{} + } + if len(t) == 1 { + s.store.peers[pid].blockProviderUpdated = t[0] + } else { + s.store.peers[pid].blockProviderUpdated = roughtime.Now() } } @@ -138,42 +194,98 @@ func (s *BlockProviderScorer) Decay() { } } -// Sorted returns list of block providers sorted by score in descending order. -func (s *BlockProviderScorer) Sorted(pids []peer.ID) []peer.ID { - s.store.Lock() - defer s.store.Unlock() - +// WeightSorted returns a list of block providers weight sorted by score, where items are selected +// probabilistically with more "heavy" items having a higher chance of being picked. +func (s *BlockProviderScorer) WeightSorted( + r *rand.Rand, pids []peer.ID, scoreFn func(pid peer.ID, score float64) float64, +) []peer.ID { if len(pids) == 0 { return pids } - scores := make(map[peer.ID]float64, len(pids)) - peers := make([]peer.ID, len(pids)) - for i, pid := range pids { - scores[pid] = s.score(pid) - peers[i] = pid + s.store.Lock() + defer s.store.Unlock() + + // See http://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/ for details. + nextPID := func(weights map[peer.ID]float64) peer.ID { + totalWeight := 0 + for _, w := range weights { + totalWeight += int(w) + } + if totalWeight <= 0 { + return "" + } + rnd := r.Intn(totalWeight) + for pid, w := range weights { + rnd -= int(w) + if rnd < 0 { + return pid + } + } + return "" } + + scores, _ := s.mapScoresAndPeers(pids, scoreFn) + peers := make([]peer.ID, 0) + for i := 0; i < len(pids); i++ { + if pid := nextPID(scores); pid != "" { + peers = append(peers, pid) + delete(scores, pid) + } + } + // Left over peers (like peers having zero weight), are added at the end of the list. + for pid := range scores { + peers = append(peers, pid) + } + + return peers +} + +// Sorted returns a list of block providers sorted by score in descending order. +// When custom scorer function is provided, items are returned in order provided by it. +func (s *BlockProviderScorer) Sorted( + pids []peer.ID, scoreFn func(pid peer.ID, score float64) float64, +) []peer.ID { + if len(pids) == 0 { + return pids + } + s.store.Lock() + defer s.store.Unlock() + + scores, peers := s.mapScoresAndPeers(pids, scoreFn) sort.Slice(peers, func(i, j int) bool { return scores[peers[i]] > scores[peers[j]] }) return peers } +// mapScoresAndPeers is a utility function to map peers and their respective scores (using custom +// scoring function if necessary). +func (s *BlockProviderScorer) mapScoresAndPeers( + pids []peer.ID, scoreFn func(pid peer.ID, score float64) float64, +) (map[peer.ID]float64, []peer.ID) { + scores := make(map[peer.ID]float64, len(pids)) + peers := make([]peer.ID, len(pids)) + for i, pid := range pids { + if scoreFn != nil { + scores[pid] = scoreFn(pid, s.score(pid)) + } else { + scores[pid] = s.score(pid) + } + peers[i] = pid + } + return scores, peers +} + // FormatScorePretty returns full scoring information in a human-readable format. func (s *BlockProviderScorer) FormatScorePretty(pid peer.ID) string { s.store.RLock() defer s.store.RUnlock() score := s.score(pid) - return fmt.Sprintf("[%0.1f%%, raw: %v, blocks: %d/%d]", - (score/s.MaxScore())*100, score, s.processedBlocks(pid), s.highestProcessedBlocksCount) + return fmt.Sprintf("[%0.1f%%, raw: %0.2f, blocks: %d/%d]", + (score/s.MaxScore())*100, score, s.processedBlocks(pid), s.config.ProcessedBlocksCap) } // MaxScore exposes maximum score attainable by peers. func (s *BlockProviderScorer) MaxScore() float64 { - score := s.Params().ProcessedBatchWeight - batchSize := uint64(flags.Get().BlockBatchLimit) - if batchSize > 0 { - totalProcessedBatches := float64(s.highestProcessedBlocksCount / batchSize) - score = totalProcessedBatches * s.config.ProcessedBatchWeight - } - return math.Round(score*ScoreRoundingFactor) / ScoreRoundingFactor + return s.maxScore } diff --git a/beacon-chain/p2p/peers/score_block_providers_test.go b/beacon-chain/p2p/peers/score_block_providers_test.go index ae3651cf3e..63fdaefd1f 100644 --- a/beacon-chain/p2p/peers/score_block_providers_test.go +++ b/beacon-chain/p2p/peers/score_block_providers_test.go @@ -3,11 +3,14 @@ package peers_test import ( "context" "fmt" + "sort" "testing" "github.com/libp2p/go-libp2p-core/peer" "github.com/prysmaticlabs/prysm/beacon-chain/flags" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers" + "github.com/prysmaticlabs/prysm/shared/rand" + "github.com/prysmaticlabs/prysm/shared/roughtime" "github.com/prysmaticlabs/prysm/shared/testutil/assert" ) @@ -15,34 +18,116 @@ func TestPeerScorer_BlockProvider_Score(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ - PeerLimit: 30, - ScorerParams: &peers.PeerScorerConfig{ - BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{ - ProcessedBatchWeight: 0.05, + batchSize := uint64(flags.Get().BlockBatchLimit) + tests := []struct { + name string + update func(scorer *peers.BlockProviderScorer) + check func(scorer *peers.BlockProviderScorer) + }{ + { + name: "nonexistent peer", + check: func(scorer *peers.BlockProviderScorer) { + assert.Equal(t, scorer.MaxScore(), scorer.Score("peer1"), "Unexpected score") }, }, - }) - scorer := peerStatuses.Scorers().BlockProviderScorer() - batchSize := uint64(flags.Get().BlockBatchLimit) + { + name: "existent peer with zero score", + update: func(scorer *peers.BlockProviderScorer) { + scorer.Touch("peer1") + }, + check: func(scorer *peers.BlockProviderScorer) { + assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score") + }, + }, + { + name: "existent peer via increment", + update: func(scorer *peers.BlockProviderScorer) { + scorer.IncrementProcessedBlocks("peer1", 0) + }, + check: func(scorer *peers.BlockProviderScorer) { + assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score") + }, + }, + { + name: "boost score of stale peer", + update: func(scorer *peers.BlockProviderScorer) { + scorer.IncrementProcessedBlocks("peer1", batchSize*3) + assert.Equal(t, 0.05*3, scorer.Score("peer1"), "Unexpected score") + scorer.Touch("peer1", roughtime.Now().Add(-1*scorer.Params().StalePeerRefreshInterval)) + }, + check: func(scorer *peers.BlockProviderScorer) { + assert.Equal(t, scorer.MaxScore(), scorer.Score("peer1"), "Unexpected score") + }, + }, + { + name: "increment with 0 score", + update: func(scorer *peers.BlockProviderScorer) { + // Increment to zero (provider is added to cache but score is unchanged). + scorer.IncrementProcessedBlocks("peer1", 0) + }, + check: func(scorer *peers.BlockProviderScorer) { + assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score") + }, + }, + { + name: "partial score", + update: func(scorer *peers.BlockProviderScorer) { + // Partial score (less than a single batch of blocks processed). + scorer.IncrementProcessedBlocks("peer1", batchSize/2) + }, + check: func(scorer *peers.BlockProviderScorer) { + assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score") + }, + }, + { + name: "single batch", + update: func(scorer *peers.BlockProviderScorer) { + scorer.IncrementProcessedBlocks("peer1", batchSize) + }, + check: func(scorer *peers.BlockProviderScorer) { + assert.Equal(t, roundScore(0.05), scorer.Score("peer1"), "Unexpected score") + }, + }, + { + name: "multiple batches", + update: func(scorer *peers.BlockProviderScorer) { + scorer.IncrementProcessedBlocks("peer1", batchSize*13) + }, + check: func(scorer *peers.BlockProviderScorer) { + assert.Equal(t, roundScore(0.05*13), scorer.Score("peer1"), "Unexpected score") + }, + }, + { + name: "maximum score cap", + update: func(scorer *peers.BlockProviderScorer) { + scorer.IncrementProcessedBlocks("peer1", batchSize*2) + assert.Equal(t, roundScore(0.05*2), scorer.Score("peer1"), "Unexpected score") + scorer.IncrementProcessedBlocks("peer1", scorer.Params().ProcessedBlocksCap) + }, + check: func(scorer *peers.BlockProviderScorer) { + assert.Equal(t, scorer.Params().ProcessedBlocksCap, scorer.ProcessedBlocks("peer1")) + assert.Equal(t, 1.0, scorer.Score("peer1")) + }, + }, + } - // Start with non-exitent provider. - assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score for unregistered provider") - // Increment to zero (provider is added to cache but score is unchanged). - scorer.IncrementProcessedBlocks("peer1", 0) - assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score for registered provider") - - // Partial score (less than a single batch of blocks processed). - scorer.IncrementProcessedBlocks("peer1", batchSize/2) - assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score") - - // Single batch. - scorer.IncrementProcessedBlocks("peer1", batchSize) - assert.Equal(t, roundScore(0.05), scorer.Score("peer1"), "Unexpected score") - - // Multiple batches. - scorer.IncrementProcessedBlocks("peer2", batchSize*13) - assert.Equal(t, roundScore(0.05*13), scorer.Score("peer2"), "Unexpected score") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{ + ProcessedBatchWeight: 0.05, + }, + }, + }) + scorer := peerStatuses.Scorers().BlockProviderScorer() + if tt.update != nil { + tt.update(scorer) + } + tt.check(scorer) + }) + } } func TestPeerScorer_BlockProvider_GettersSetters(t *testing.T) { @@ -59,11 +144,76 @@ func TestPeerScorer_BlockProvider_GettersSetters(t *testing.T) { assert.Equal(t, uint64(64), scorer.ProcessedBlocks("peer1")) } +func TestPeerScorer_BlockProvider_WeightSorted(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ + ScorerParams: &peers.PeerScorerConfig{ + BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{ + ProcessedBatchWeight: 1, + }, + }, + }) + scorer := peerStatuses.Scorers().BlockProviderScorer() + batchSize := uint64(flags.Get().BlockBatchLimit) + r := rand.NewDeterministicGenerator() + + reverse := func(pids []peer.ID) []peer.ID { + tmp := make([]peer.ID, len(pids)) + copy(tmp, pids) + for i, j := 0, len(tmp)-1; i < j; i, j = i+1, j-1 { + tmp[i], tmp[j] = tmp[j], tmp[i] + } + return tmp + } + + shuffle := func(pids []peer.ID) []peer.ID { + tmp := make([]peer.ID, len(pids)) + copy(tmp, pids) + r.Shuffle(len(tmp), func(i, j int) { + tmp[i], tmp[j] = tmp[j], tmp[i] + }) + return tmp + } + + var pids []peer.ID + for i := uint64(0); i < 10; i++ { + pid := peer.ID(i) + scorer.IncrementProcessedBlocks(pid, i*batchSize) + pids = append(pids, pid) + } + // Make sure that peers scores are correct (peer(n).score > peer(n-1).score). + // Peers should be returned in descending order (by score). + assert.DeepEqual(t, reverse(pids), scorer.Sorted(pids, nil)) + + // Run weighted sort lots of time, to get accurate statistics of whether more heavy items + // are indeed preferred when sorting. + scores := make(map[peer.ID]int, len(pids)) + for i := 0; i < 1000; i++ { + score := len(pids) - 1 + // The earlier in the list the item is, the more of a score will it get. + for _, pid := range scorer.WeightSorted(r, shuffle(pids), nil) { + scores[pid] += score + score-- + } + } + var scoredPIDs []peer.ID + for pid := range scores { + scoredPIDs = append(scoredPIDs, pid) + } + sort.Slice(scoredPIDs, func(i, j int) bool { + return scores[scoredPIDs[i]] > scores[scoredPIDs[j]] + }) + assert.Equal(t, len(pids), len(scoredPIDs)) + assert.DeepEqual(t, reverse(pids), scoredPIDs, "Expected items with more weight to be picked more often") +} + func TestPeerScorer_BlockProvider_Sorted(t *testing.T) { batchSize := uint64(flags.Get().BlockBatchLimit) tests := []struct { name string update func(s *peers.BlockProviderScorer) + score func(pid peer.ID, score float64) float64 have []peer.ID want []peer.ID }{ @@ -113,6 +263,25 @@ func TestPeerScorer_BlockProvider_Sorted(t *testing.T) { have: []peer.ID{"peer3", "peer2", "peer1"}, want: []peer.ID{"peer1", "peer3", "peer2"}, }, + { + name: "custom scorer", + update: func(s *peers.BlockProviderScorer) { + s.IncrementProcessedBlocks("peer1", batchSize*3) + s.IncrementProcessedBlocks("peer2", batchSize*1) + s.IncrementProcessedBlocks("peer3", batchSize*2) + }, + score: func(pid peer.ID, score float64) float64 { + if pid == "peer2" { + return score + 0.3 // 0.2 + 0.3 = 0.5 > 0.4 (of peer3) + } + if pid == "peer1" { + return 0.0 + } + return score + }, + have: []peer.ID{"peer3", "peer2", "peer1"}, + want: []peer.ID{"peer2", "peer3", "peer1"}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -127,7 +296,7 @@ func TestPeerScorer_BlockProvider_Sorted(t *testing.T) { }) scorer := peerStatuses.Scorers().BlockProviderScorer() tt.update(scorer) - assert.DeepEqual(t, tt.want, scorer.Sorted(tt.have)) + assert.DeepEqual(t, tt.want, scorer.Sorted(tt.have, tt.score)) }) } } @@ -138,70 +307,22 @@ func TestPeerScorer_BlockProvider_MaxScore(t *testing.T) { batchSize := uint64(flags.Get().BlockBatchLimit) tests := []struct { - name string - update func(s *peers.BlockProviderScorer) - want float64 + name string + cfg *peers.BlockProviderScorerConfig + want float64 }{ { - // Minimal max.score is a reward for a single batch. - name: "no peers", - update: nil, - want: 0.2, + name: "default config", + cfg: &peers.BlockProviderScorerConfig{}, + want: 1.0, }, { - name: "partial batch", - update: func(s *peers.BlockProviderScorer) { - s.IncrementProcessedBlocks("peer1", batchSize/4) + name: "custom config", + cfg: &peers.BlockProviderScorerConfig{ + ProcessedBatchWeight: 0.5, + ProcessedBlocksCap: batchSize * 300, }, - want: 0.2, - }, - { - name: "single batch", - update: func(s *peers.BlockProviderScorer) { - s.IncrementProcessedBlocks("peer1", batchSize) - }, - want: 0.2, - }, - { - name: "3/2 of a batch", - update: func(s *peers.BlockProviderScorer) { - s.IncrementProcessedBlocks("peer1", batchSize*3/2) - }, - want: 0.2, - }, - { - name: "multiple batches", - update: func(s *peers.BlockProviderScorer) { - s.IncrementProcessedBlocks("peer1", batchSize*5) - }, - want: 0.2 * 5, - }, - { - name: "multiple peers", - update: func(s *peers.BlockProviderScorer) { - s.IncrementProcessedBlocks("peer1", batchSize*5) - s.IncrementProcessedBlocks("peer1", batchSize) - s.IncrementProcessedBlocks("peer2", batchSize*10) - s.IncrementProcessedBlocks("peer1", batchSize/4) - }, - want: 0.2 * 10, - }, - { - // Even after stats is decayed, max. attained blocks count must remain - // (as a ballpark of overall performance of peers during life-cycle of service). - name: "decaying", - update: func(s *peers.BlockProviderScorer) { - s.IncrementProcessedBlocks("peer1", batchSize*5) - s.IncrementProcessedBlocks("peer1", batchSize) - s.IncrementProcessedBlocks("peer2", batchSize*10) - s.IncrementProcessedBlocks("peer1", batchSize/4) - for i := 0; i < 10; i++ { - s.Decay() - } - assert.Equal(t, uint64(0), s.ProcessedBlocks("peer1")) - assert.Equal(t, uint64(0), s.ProcessedBlocks("peer2")) - }, - want: 0.2 * 10, + want: 150.0, }, } @@ -209,15 +330,10 @@ func TestPeerScorer_BlockProvider_MaxScore(t *testing.T) { t.Run(tt.name, func(t *testing.T) { peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ ScorerParams: &peers.PeerScorerConfig{ - BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{ - ProcessedBatchWeight: 0.2, - }, + BlockProviderScorerConfig: tt.cfg, }, }) scorer := peerStatuses.Scorers().BlockProviderScorer() - if tt.update != nil { - tt.update(scorer) - } assert.Equal(t, tt.want, scorer.MaxScore()) }) } @@ -227,94 +343,94 @@ func TestPeerScorer_BlockProvider_FormatScorePretty(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() batchSize := uint64(flags.Get().BlockBatchLimit) - prettyFormat := "[%0.1f%%, raw: %v, blocks: %d/%d]" + format := "[%0.1f%%, raw: %0.2f, blocks: %d/1280]" tests := []struct { name string update func(s *peers.BlockProviderScorer) - want string + check func(s *peers.BlockProviderScorer) }{ { - // Minimal max.score is a reward for a single batch. - name: "no peers", + name: "peer not registered", update: nil, - want: fmt.Sprintf(prettyFormat, 0.0, 0, 0, batchSize), + check: func(s *peers.BlockProviderScorer) { + assert.Equal(t, fmt.Sprintf(format, 100.0, 1.0, 0), s.FormatScorePretty("peer1")) + }, + }, + { + name: "peer registered zero blocks", + update: func(s *peers.BlockProviderScorer) { + s.Touch("peer1") + }, + check: func(s *peers.BlockProviderScorer) { + assert.Equal(t, fmt.Sprintf(format, 0.0, 0.0, 0), s.FormatScorePretty("peer1")) + }, }, { name: "partial batch", update: func(s *peers.BlockProviderScorer) { s.IncrementProcessedBlocks("peer1", batchSize/4) }, - want: fmt.Sprintf(prettyFormat, 0.0, 0, batchSize/4, batchSize), + check: func(s *peers.BlockProviderScorer) { + assert.Equal(t, fmt.Sprintf(format, 0.0, 0.0, batchSize/4), s.FormatScorePretty("peer1")) + }, }, { name: "single batch", update: func(s *peers.BlockProviderScorer) { s.IncrementProcessedBlocks("peer1", batchSize) - // There was some other peer who increased maximally attainable number of blocks. - s.IncrementProcessedBlocks("peer2", batchSize*5) }, - want: fmt.Sprintf(prettyFormat, 20.0, 0.05, batchSize, batchSize*5), - }, - { - name: "single batch max score", - update: func(s *peers.BlockProviderScorer) { - s.IncrementProcessedBlocks("peer1", batchSize) + check: func(s *peers.BlockProviderScorer) { + assert.Equal(t, fmt.Sprintf(format, 5.0, 0.05, batchSize), s.FormatScorePretty("peer1")) }, - want: fmt.Sprintf(prettyFormat, 100.0, 0.05, batchSize, batchSize), }, { name: "3/2 of a batch", update: func(s *peers.BlockProviderScorer) { s.IncrementProcessedBlocks("peer1", batchSize*3/2) - s.IncrementProcessedBlocks("peer2", batchSize*5) }, - want: fmt.Sprintf(prettyFormat, 20.0, 0.05, batchSize*3/2, batchSize*5), - }, - { - name: "3/2 of a batch max score", - update: func(s *peers.BlockProviderScorer) { - s.IncrementProcessedBlocks("peer1", batchSize*3/2) + check: func(s *peers.BlockProviderScorer) { + assert.Equal(t, fmt.Sprintf(format, 5.0, 0.05, batchSize*3/2), s.FormatScorePretty("peer1")) }, - want: fmt.Sprintf(prettyFormat, 100.0, 0.05, batchSize*3/2, batchSize*3/2), }, { name: "multiple batches", update: func(s *peers.BlockProviderScorer) { s.IncrementProcessedBlocks("peer1", batchSize*5) - s.IncrementProcessedBlocks("peer2", batchSize*10) }, - want: fmt.Sprintf(prettyFormat, 50.0, 0.05*5, batchSize*5, batchSize*10), + check: func(s *peers.BlockProviderScorer) { + assert.Equal(t, fmt.Sprintf(format, 25.0, 0.05*5, batchSize*5), s.FormatScorePretty("peer1")) + }, }, { name: "multiple batches max score", update: func(s *peers.BlockProviderScorer) { - s.IncrementProcessedBlocks("peer1", batchSize*5) + s.IncrementProcessedBlocks("peer1", s.Params().ProcessedBlocksCap*5) }, - want: fmt.Sprintf(prettyFormat, 100.0, 0.05*5, batchSize*5, batchSize*5), - }, - { - name: "multiple peers", - update: func(s *peers.BlockProviderScorer) { - s.IncrementProcessedBlocks("peer1", batchSize*5) - s.IncrementProcessedBlocks("peer1", batchSize) - s.IncrementProcessedBlocks("peer2", batchSize*10) - s.IncrementProcessedBlocks("peer1", batchSize/4) + check: func(s *peers.BlockProviderScorer) { + want := fmt.Sprintf(format, 100.0, 1.0, s.Params().ProcessedBlocksCap) + assert.Equal(t, want, s.FormatScorePretty("peer1")) }, - want: fmt.Sprintf(prettyFormat, 60.0, 0.05*6, batchSize*6+batchSize/4, batchSize*10), }, { name: "decaying", update: func(s *peers.BlockProviderScorer) { s.IncrementProcessedBlocks("peer1", batchSize*5) s.IncrementProcessedBlocks("peer1", batchSize) - s.IncrementProcessedBlocks("peer2", batchSize*10) s.IncrementProcessedBlocks("peer1", batchSize/4) - expected := fmt.Sprintf(prettyFormat, 60.0, 0.05*6, batchSize*6+batchSize/4, batchSize*10) - assert.Equal(t, expected, s.FormatScorePretty("peer1")) + want := fmt.Sprintf(format, 30.0, 0.05*6, batchSize*6+batchSize/4) + assert.Equal(t, want, s.FormatScorePretty("peer1")) + // Maximize block count. + s.IncrementProcessedBlocks("peer1", s.Params().ProcessedBlocksCap) + want = fmt.Sprintf(format, 100.0, 1.0, s.Params().ProcessedBlocksCap) + assert.Equal(t, want, s.FormatScorePretty("peer1")) + // Half of blocks is to be decayed. s.Decay() }, - want: fmt.Sprintf(prettyFormat, 50.0, 0.05*5, batchSize*5+batchSize/4, batchSize*10), + check: func(s *peers.BlockProviderScorer) { + want := fmt.Sprintf(format, 50.0, 0.5, s.Params().ProcessedBlocksCap/2) + assert.Equal(t, want, s.FormatScorePretty("peer1")) + }, }, } @@ -324,6 +440,8 @@ func TestPeerScorer_BlockProvider_FormatScorePretty(t *testing.T) { ScorerParams: &peers.PeerScorerConfig{ BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{ ProcessedBatchWeight: 0.05, + ProcessedBlocksCap: 20 * batchSize, + Decay: 10 * batchSize, }, }, }) @@ -331,7 +449,7 @@ func TestPeerScorer_BlockProvider_FormatScorePretty(t *testing.T) { if tt.update != nil { tt.update(scorer) } - assert.Equal(t, tt.want, scorer.FormatScorePretty("peer1")) + tt.check(scorer) }) } } diff --git a/beacon-chain/p2p/peers/scorer_manager_test.go b/beacon-chain/p2p/peers/scorer_manager_test.go index a83e868754..cc830ecf97 100644 --- a/beacon-chain/p2p/peers/scorer_manager_test.go +++ b/beacon-chain/p2p/peers/scorer_manager_test.go @@ -16,6 +16,8 @@ func TestPeerScorer_PeerScorerManager_Init(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + batchSize := uint64(flags.Get().BlockBatchLimit) + t.Run("default config", func(t *testing.T) { peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ PeerLimit: 30, @@ -32,7 +34,10 @@ func TestPeerScorer_PeerScorerManager_Init(t *testing.T) { t.Run("block providers scorer", func(t *testing.T) { params := peerStatuses.Scorers().BlockProviderScorer().Params() assert.Equal(t, peers.DefaultBlockProviderProcessedBatchWeight, params.ProcessedBatchWeight) + assert.Equal(t, peers.DefaultBlockProviderProcessedBlocksCap, params.ProcessedBlocksCap) assert.Equal(t, peers.DefaultBlockProviderDecayInterval, params.DecayInterval) + assert.Equal(t, peers.DefaultBlockProviderDecay, params.Decay) + assert.Equal(t, peers.DefaultBlockProviderStalePeerRefreshInterval, params.StalePeerRefreshInterval) }) }) @@ -46,9 +51,11 @@ func TestPeerScorer_PeerScorerManager_Init(t *testing.T) { DecayInterval: 1 * time.Minute, }, BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{ - ProcessedBatchWeight: 0.6, - DecayInterval: 1 * time.Minute, - Decay: 16, + ProcessedBatchWeight: 0.2, + ProcessedBlocksCap: batchSize * 5, + DecayInterval: 1 * time.Minute, + Decay: 16, + StalePeerRefreshInterval: 5 * time.Hour, }, }, }) @@ -62,9 +69,12 @@ func TestPeerScorer_PeerScorerManager_Init(t *testing.T) { t.Run("block provider scorer", func(t *testing.T) { params := peerStatuses.Scorers().BlockProviderScorer().Params() - assert.Equal(t, 0.6, params.ProcessedBatchWeight) + assert.Equal(t, 0.2, params.ProcessedBatchWeight) + assert.Equal(t, batchSize*5, params.ProcessedBlocksCap) assert.Equal(t, 1*time.Minute, params.DecayInterval) assert.Equal(t, uint64(16), params.Decay) + assert.Equal(t, 5*time.Hour, params.StalePeerRefreshInterval) + assert.Equal(t, 1.0, peerStatuses.Scorers().BlockProviderScorer().MaxScore()) }) }) } @@ -109,7 +119,7 @@ func TestPeerScorer_PeerScorerManager_Score(t *testing.T) { for _, pid := range pids { peerStatuses.Add(nil, pid, nil, network.DirUnknown) // Not yet used peer gets boosted score. - assert.Equal(t, 0.0, s.Score(pid), "Unexpected score for not yet used peer") + assert.Equal(t, s.BlockProviderScorer().MaxScore(), s.Score(pid), "Unexpected score for not yet used peer") } return s, pids } @@ -120,30 +130,32 @@ func TestPeerScorer_PeerScorerManager_Score(t *testing.T) { }) s := peerStatuses.Scorers() assert.Equal(t, 0.0, s.BadResponsesScorer().Score("peer1")) - assert.Equal(t, 0.0, s.BlockProviderScorer().Score("peer1")) + assert.Equal(t, s.BlockProviderScorer().MaxScore(), s.BlockProviderScorer().Score("peer1")) assert.Equal(t, 0.0, s.Score("peer1")) }) t.Run("bad responses score", func(t *testing.T) { s, pids := setupScorer() + zeroScore := s.BlockProviderScorer().MaxScore() // Update peers' stats and test the effect on peer order. s.BadResponsesScorer().Increment("peer2") - assert.DeepEqual(t, pack(s, 0, -0.2, 0), peerScores(s, pids), "Unexpected scores") + assert.DeepEqual(t, pack(s, zeroScore, zeroScore-0.2, zeroScore), peerScores(s, pids), "Unexpected scores") s.BadResponsesScorer().Increment("peer1") s.BadResponsesScorer().Increment("peer1") - assert.DeepEqual(t, pack(s, -0.4, -0.2, 0), peerScores(s, pids), "Unexpected scores") + assert.DeepEqual(t, pack(s, zeroScore-0.4, zeroScore-0.2, zeroScore), peerScores(s, pids), "Unexpected scores") // See how decaying affects order of peers. s.BadResponsesScorer().Decay() - assert.DeepEqual(t, pack(s, -0.2, 0, 0), peerScores(s, pids), "Unexpected scores") + assert.DeepEqual(t, pack(s, zeroScore-0.2, zeroScore, zeroScore), peerScores(s, pids), "Unexpected scores") s.BadResponsesScorer().Decay() - assert.DeepEqual(t, pack(s, 0, 0, 0), peerScores(s, pids), "Unexpected scores") + assert.DeepEqual(t, pack(s, zeroScore, zeroScore, zeroScore), peerScores(s, pids), "Unexpected scores") }) t.Run("block providers score", func(t *testing.T) { s, pids := setupScorer() s1 := s.BlockProviderScorer() + zeroScore := s.BlockProviderScorer().MaxScore() // Partial batch. s1.IncrementProcessedBlocks("peer1", batchSize/4) @@ -151,11 +163,11 @@ func TestPeerScorer_PeerScorerManager_Score(t *testing.T) { // Single batch. s1.IncrementProcessedBlocks("peer1", batchSize) - assert.DeepEqual(t, pack(s, 0.05, 0, 0), peerScores(s, pids), "Unexpected scores") + assert.DeepEqual(t, pack(s, 0.05, zeroScore, zeroScore), peerScores(s, pids), "Unexpected scores") // Multiple batches. s1.IncrementProcessedBlocks("peer2", batchSize*4) - assert.DeepEqual(t, pack(s, 0.05, 0.05*4, 0), peerScores(s, pids), "Unexpected scores") + assert.DeepEqual(t, pack(s, 0.05, 0.05*4, zeroScore), peerScores(s, pids), "Unexpected scores") // Partial batch. s1.IncrementProcessedBlocks("peer3", batchSize/2) diff --git a/beacon-chain/p2p/peers/store.go b/beacon-chain/p2p/peers/store.go index 6c28f85b37..79b8da3a22 100644 --- a/beacon-chain/p2p/peers/store.go +++ b/beacon-chain/p2p/peers/store.go @@ -38,6 +38,7 @@ type peerData struct { chainStateLastUpdated time.Time badResponses int processedBlocks uint64 + blockProviderUpdated time.Time } // newPeerDataStore creates peer store.