Refactor peer scorer into peerdata + scorers (#7452)

* updates comment

* manager -> service

* rename receiver

* refacgtor bad_responses

* refactor store

* update status service

* extends data service

* status service test

* refactor block provider scorer

* misc updates

* fix tests

* data -> peerdata

* gazelle

* peerdata/store test

* limit the visibility scope

* Nishant's suggestion

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Victor Farazdagi
2020-10-07 16:08:51 +03:00
committed by GitHub
parent ae78546323
commit 9ce64e2428
28 changed files with 722 additions and 480 deletions

View File

@@ -42,6 +42,8 @@ go_library(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared:go_default_library",
"//shared/featureconfig:go_default_library",
@@ -115,6 +117,7 @@ go_test(
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/testing:go_default_library",

View File

@@ -9,6 +9,7 @@ import (
"testing"
"time"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
@@ -157,7 +158,7 @@ func TestService_BroadcastAttestation(t *testing.T) {
subnetsLock: make(map[uint64]*sync.RWMutex),
subnetsLockLock: sync.Mutex{},
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
ScorerParams: &peers.PeerScorerConfig{},
ScorerParams: &scorers.Config{},
}),
}
@@ -322,7 +323,7 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
subnetsLock: make(map[uint64]*sync.RWMutex),
subnetsLockLock: sync.Mutex{},
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
ScorerParams: &peers.PeerScorerConfig{},
ScorerParams: &scorers.Config{},
}),
}
@@ -337,7 +338,7 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
subnetsLock: make(map[uint64]*sync.RWMutex),
subnetsLockLock: sync.Mutex{},
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
ScorerParams: &peers.PeerScorerConfig{},
ScorerParams: &scorers.Config{},
}),
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
@@ -26,8 +27,8 @@ func TestPeer_AtMaxLimit(t *testing.T) {
}
s.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 0,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 3,
},
},
@@ -95,8 +96,8 @@ func TestPeer_BelowMaxLimit(t *testing.T) {
}
s.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 1,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 3,
},
},

View File

@@ -10,6 +10,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
"github.com/prysmaticlabs/prysm/shared/timeutils"
"github.com/sirupsen/logrus"
)
@@ -109,7 +110,7 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer
}
// If peer hasn't sent a status request, we disconnect with them
if _, err := s.peers.ChainState(remotePeer); err == peers.ErrPeerUnknown {
if _, err := s.peers.ChainState(remotePeer); err == peerdata.ErrPeerUnknown {
disconnectFromPeer()
return
}

View File

@@ -3,29 +3,21 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")
go_library(
name = "go_default_library",
srcs = [
"score_bad_responses.go",
"score_block_providers.go",
"scorer_manager.go",
"status.go",
"store.go",
],
srcs = ["status.go"],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/flags:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/params:go_default_library",
"//shared/rand:go_default_library",
"//shared/timeutils:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
],
)
@@ -35,21 +27,18 @@ go_test(
srcs = [
"benchmark_test.go",
"peers_test.go",
"score_bad_responses_test.go",
"score_block_providers_test.go",
"scorer_manager_test.go",
"status_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/flags:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/params:go_default_library",
"//shared/rand:go_default_library",
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"//shared/timeutils:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",

View File

@@ -0,0 +1,28 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["store.go"],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//proto/beacon/p2p/v1:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["store_test.go"],
embed = [":go_default_library"],
deps = [
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
],
)

View File

@@ -0,0 +1,107 @@
package peerdata
import (
"context"
"errors"
"sync"
"time"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)
var (
// ErrPeerUnknown is returned when there is an attempt to obtain data from a peer that is not known.
ErrPeerUnknown = errors.New("peer unknown")
)
// PeerConnectionState is the state of the connection.
type PeerConnectionState ethpb.ConnectionState
// StoreConfig holds peer store parameters.
type StoreConfig struct {
MaxPeers int
}
// Store is a container for various peer related data (both protocol and app level).
// Container implements RWMutex, so data access can be restricted on the container level. This allows
// different components rely on the very same peer map container.
// Note: access to data is controlled by clients i.e. client code is responsible for locking/unlocking
// the mutex when accessing data.
type Store struct {
sync.RWMutex
ctx context.Context
config *StoreConfig
peers map[peer.ID]*PeerData
}
// PeerData aggregates protocol and application level info about a single peer.
type PeerData struct {
// Network related data.
Address ma.Multiaddr
Direction network.Direction
ConnState PeerConnectionState
Enr *enr.Record
// Chain related data.
ChainState *pb.Status
MetaData *pb.MetaData
ChainStateLastUpdated time.Time
// Scorers related data.
BadResponses int
ProcessedBlocks uint64
BlockProviderUpdated time.Time
}
// NewStore creates new peer data store.
func NewStore(ctx context.Context, config *StoreConfig) *Store {
return &Store{
ctx: ctx,
config: config,
peers: make(map[peer.ID]*PeerData),
}
}
// PeerData returns data associated with a given peer, if any.
// Important: it is assumed that store mutex is locked when calling this method.
func (s *Store) PeerData(pid peer.ID) (*PeerData, bool) {
peerData, ok := s.peers[pid]
return peerData, ok
}
// PeerDataGetOrCreate returns data associated with a given peer.
// If no data has been associated yet, newly created and associated data object is returned.
// Important: it is assumed that store mutex is locked when calling this method.
func (s *Store) PeerDataGetOrCreate(pid peer.ID) *PeerData {
if peerData, ok := s.peers[pid]; ok {
return peerData
}
s.peers[pid] = &PeerData{}
return s.peers[pid]
}
// SetPeerData updates data associated with a given peer.
// Important: it is assumed that store mutex is locked when calling this method.
func (s *Store) SetPeerData(pid peer.ID, data *PeerData) {
s.peers[pid] = data
}
// DeletePeerData removes data associated with a given peer.
// Important: it is assumed that store mutex is locked when calling this method.
func (s *Store) DeletePeerData(pid peer.ID) {
delete(s.peers, pid)
}
// Peers returns map of peer data objects.
// Important: it is assumed that store mutex is locked when calling this method.
func (s *Store) Peers() map[peer.ID]*PeerData {
return s.peers
}
// Config exposes store configuration params.
func (s *Store) Config() *StoreConfig {
return s.config
}

View File

@@ -0,0 +1,82 @@
package peerdata_test
import (
"context"
"testing"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestStore_GetSetDelete(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := peerdata.NewStore(ctx, &peerdata.StoreConfig{
MaxPeers: 12,
})
assert.NotNil(t, store)
assert.Equal(t, 12, store.Config().MaxPeers)
// Check non-existent data.
pid := peer.ID("00001")
peerData, ok := store.PeerData(pid)
assert.Equal(t, false, ok)
assert.Equal(t, (*peerdata.PeerData)(nil), peerData)
assert.Equal(t, 0, len(store.Peers()))
// Associate data.
store.SetPeerData(pid, &peerdata.PeerData{
BadResponses: 3,
ProcessedBlocks: 42,
})
peerData, ok = store.PeerData(pid)
assert.Equal(t, true, ok)
assert.Equal(t, 3, peerData.BadResponses)
assert.Equal(t, uint64(42), peerData.ProcessedBlocks)
require.Equal(t, 1, len(store.Peers()))
peers := store.Peers()
_, ok = peers[pid]
require.Equal(t, true, ok)
assert.Equal(t, 3, peers[pid].BadResponses)
assert.Equal(t, uint64(42), peers[pid].ProcessedBlocks)
// Remove data from peer.
store.DeletePeerData(pid)
peerData, ok = store.PeerData(pid)
assert.Equal(t, false, ok)
assert.Equal(t, (*peerdata.PeerData)(nil), peerData)
assert.Equal(t, 0, len(store.Peers()))
}
func TestStore_PeerDataGetOrCreate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := peerdata.NewStore(ctx, &peerdata.StoreConfig{
MaxPeers: 12,
})
assert.NotNil(t, store)
assert.Equal(t, 12, store.Config().MaxPeers)
pid := peer.ID("00001")
peerData, ok := store.PeerData(pid)
assert.Equal(t, false, ok)
assert.Equal(t, (*peerdata.PeerData)(nil), peerData)
assert.Equal(t, 0, len(store.Peers()))
peerData = store.PeerDataGetOrCreate(pid)
assert.NotNil(t, peerData)
assert.Equal(t, 0, peerData.BadResponses)
assert.Equal(t, uint64(0), peerData.ProcessedBlocks)
require.Equal(t, 1, len(store.Peers()))
// Method must be idempotent, check.
peerData = store.PeerDataGetOrCreate(pid)
assert.NotNil(t, peerData)
assert.Equal(t, 0, peerData.BadResponses)
assert.Equal(t, uint64(0), peerData.ProcessedBlocks)
require.Equal(t, 1, len(store.Peers()))
}

View File

@@ -2,12 +2,10 @@ package peers_test
import (
"io/ioutil"
"math"
"os"
"testing"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/sirupsen/logrus"
)
@@ -35,8 +33,3 @@ func TestMain(m *testing.M) {
flags.Init(resetFlags)
os.Exit(code)
}
// roundScore returns score rounded in accordance with the score manager's rounding factor.
func roundScore(score float64) float64 {
return math.Round(score*peers.ScoreRoundingFactor) / peers.ScoreRoundingFactor
}

View File

@@ -1,85 +0,0 @@
package peers
import (
"context"
"math"
"time"
"github.com/libp2p/go-libp2p-core/peer"
)
// ScoreRoundingFactor defines how many digits to keep in decimal part.
// This parameter is used in math.Round(score*ScoreRoundingFactor) / ScoreRoundingFactor.
const ScoreRoundingFactor = 10000
// PeerScorerManager keeps track of peer scorers that are used to calculate overall peer score.
type PeerScorerManager struct {
ctx context.Context
store *peerDataStore
scorers struct {
badResponsesScorer *BadResponsesScorer
blockProviderScorer *BlockProviderScorer
}
}
// PeerScorerConfig holds configuration parameters for scoring service.
type PeerScorerConfig struct {
BadResponsesScorerConfig *BadResponsesScorerConfig
BlockProviderScorerConfig *BlockProviderScorerConfig
}
// newPeerScorerManager provides fully initialized peer scoring service.
func newPeerScorerManager(ctx context.Context, store *peerDataStore, config *PeerScorerConfig) *PeerScorerManager {
mgr := &PeerScorerManager{
ctx: ctx,
store: store,
}
mgr.scorers.badResponsesScorer = newBadResponsesScorer(ctx, store, config.BadResponsesScorerConfig)
mgr.scorers.blockProviderScorer = newBlockProviderScorer(ctx, store, config.BlockProviderScorerConfig)
go mgr.loop(mgr.ctx)
return mgr
}
// BadResponsesScorer exposes bad responses scoring service.
func (m *PeerScorerManager) BadResponsesScorer() *BadResponsesScorer {
return m.scorers.badResponsesScorer
}
// BlockProviderScorer exposes block provider scoring service.
func (m *PeerScorerManager) BlockProviderScorer() *BlockProviderScorer {
return m.scorers.blockProviderScorer
}
// Score returns calculated peer score across all tracked metrics.
func (m *PeerScorerManager) Score(pid peer.ID) float64 {
m.store.RLock()
defer m.store.RUnlock()
score := float64(0)
if _, ok := m.store.peers[pid]; !ok {
return 0
}
score += m.scorers.badResponsesScorer.score(pid)
score += m.scorers.blockProviderScorer.score(pid)
return math.Round(score*ScoreRoundingFactor) / ScoreRoundingFactor
}
// loop handles background tasks.
func (m *PeerScorerManager) loop(ctx context.Context) {
decayBadResponsesStats := time.NewTicker(m.scorers.badResponsesScorer.Params().DecayInterval)
defer decayBadResponsesStats.Stop()
decayBlockProviderStats := time.NewTicker(m.scorers.blockProviderScorer.Params().DecayInterval)
defer decayBlockProviderStats.Stop()
for {
select {
case <-decayBadResponsesStats.C:
m.scorers.badResponsesScorer.Decay()
case <-decayBlockProviderStats.C:
m.scorers.blockProviderScorer.Decay()
case <-ctx.Done():
return
}
}
}

View File

@@ -0,0 +1,45 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"bad_responses.go",
"block_providers.go",
"service.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/flags:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/rand:go_default_library",
"//shared/timeutils:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"bad_responses_test.go",
"block_providers_test.go",
"scorers_test.go",
"service_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/flags:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/rand:go_default_library",
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"//shared/timeutils:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -1,10 +1,11 @@
package peers
package scorers
import (
"context"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
)
const (
@@ -21,7 +22,7 @@ const (
type BadResponsesScorer struct {
ctx context.Context
config *BadResponsesScorerConfig
store *peerDataStore
store *peerdata.Store
}
// BadResponsesScorerConfig holds configuration parameters for bad response scoring service.
@@ -36,7 +37,7 @@ type BadResponsesScorerConfig struct {
// newBadResponsesScorer creates new bad responses scoring service.
func newBadResponsesScorer(
ctx context.Context, store *peerDataStore, config *BadResponsesScorerConfig) *BadResponsesScorer {
ctx context.Context, store *peerdata.Store, config *BadResponsesScorerConfig) *BadResponsesScorer {
if config == nil {
config = &BadResponsesScorerConfig{}
}
@@ -67,12 +68,12 @@ func (s *BadResponsesScorer) Score(pid peer.ID) float64 {
// score is a lock-free version of ScoreBadResponses.
func (s *BadResponsesScorer) score(pid peer.ID) float64 {
score := float64(0)
peerData, ok := s.store.peers[pid]
peerData, ok := s.store.PeerData(pid)
if !ok {
return score
}
if peerData.badResponses > 0 {
score = float64(peerData.badResponses) / float64(s.config.Threshold)
if peerData.BadResponses > 0 {
score = float64(peerData.BadResponses) / float64(s.config.Threshold)
score = score * s.config.Weight
}
return score
@@ -92,10 +93,10 @@ func (s *BadResponsesScorer) Count(pid peer.ID) (int, error) {
// count is a lock-free version of Count.
func (s *BadResponsesScorer) count(pid peer.ID) (int, error) {
if peerData, ok := s.store.peers[pid]; ok {
return peerData.badResponses, nil
if peerData, ok := s.store.PeerData(pid); ok {
return peerData.BadResponses, nil
}
return -1, ErrPeerUnknown
return -1, peerdata.ErrPeerUnknown
}
// Increment increments the number of bad responses we have received from the given remote peer.
@@ -104,13 +105,14 @@ func (s *BadResponsesScorer) Increment(pid peer.ID) {
s.store.Lock()
defer s.store.Unlock()
if _, ok := s.store.peers[pid]; !ok {
s.store.peers[pid] = &peerData{
badResponses: 1,
}
peerData, ok := s.store.PeerData(pid)
if !ok {
s.store.SetPeerData(pid, &peerdata.PeerData{
BadResponses: 1,
})
return
}
s.store.peers[pid].badResponses++
peerData.BadResponses++
}
// IsBadPeer states if the peer is to be considered bad.
@@ -123,8 +125,8 @@ func (s *BadResponsesScorer) IsBadPeer(pid peer.ID) bool {
// isBadPeer is lock-free version of IsBadPeer.
func (s *BadResponsesScorer) isBadPeer(pid peer.ID) bool {
if peerData, ok := s.store.peers[pid]; ok {
return peerData.badResponses >= s.config.Threshold
if peerData, ok := s.store.PeerData(pid); ok {
return peerData.BadResponses >= s.config.Threshold
}
return false
}
@@ -135,7 +137,7 @@ func (s *BadResponsesScorer) BadPeers() []peer.ID {
defer s.store.RUnlock()
badPeers := make([]peer.ID, 0)
for pid := range s.store.peers {
for pid := range s.store.Peers() {
if s.isBadPeer(pid) {
badPeers = append(badPeers, pid)
}
@@ -150,9 +152,9 @@ func (s *BadResponsesScorer) Decay() {
s.store.Lock()
defer s.store.Unlock()
for _, peerData := range s.store.peers {
if peerData.badResponses > 0 {
peerData.badResponses--
for _, peerData := range s.store.Peers() {
if peerData.BadResponses > 0 {
peerData.BadResponses--
}
}
}

View File

@@ -1,4 +1,4 @@
package peers_test
package scorers_test
import (
"context"
@@ -8,18 +8,20 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestPeerScorer_BadResponses_Score(t *testing.T) {
func TestScorers_BadResponses_Score(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 4,
},
},
@@ -37,15 +39,15 @@ func TestPeerScorer_BadResponses_Score(t *testing.T) {
assert.Equal(t, true, scorer.IsBadPeer("peer1"))
}
func TestPeerScorer_BadResponses_ParamsThreshold(t *testing.T) {
func TestScorers_BadResponses_ParamsThreshold(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
maxBadResponses := 2
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -54,19 +56,19 @@ func TestPeerScorer_BadResponses_ParamsThreshold(t *testing.T) {
assert.Equal(t, maxBadResponses, scorer.BadResponsesScorer().Params().Threshold)
}
func TestPeerScorer_BadResponses_Count(t *testing.T) {
func TestScorers_BadResponses_Count(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{},
ScorerParams: &scorers.Config{},
})
scorer := peerStatuses.Scorers()
pid := peer.ID("peer1")
_, err := scorer.BadResponsesScorer().Count(pid)
assert.ErrorContains(t, peers.ErrPeerUnknown.Error(), err)
assert.ErrorContains(t, peerdata.ErrPeerUnknown.Error(), err)
peerStatuses.Add(nil, pid, nil, network.DirUnknown)
count, err := scorer.BadResponsesScorer().Count(pid)
@@ -74,15 +76,15 @@ func TestPeerScorer_BadResponses_Count(t *testing.T) {
assert.Equal(t, 0, count)
}
func TestPeerScorer_BadResponses_Decay(t *testing.T) {
func TestScorers_BadResponses_Decay(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
maxBadResponses := 2
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
Weight: 1,
},
@@ -131,13 +133,13 @@ func TestPeerScorer_BadResponses_Decay(t *testing.T) {
assert.Equal(t, 1, badResponses, "unexpected bad responses for pid3")
}
func TestPeerScorer_BadResponses_IsBadPeer(t *testing.T) {
func TestScorers_BadResponses_IsBadPeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{},
ScorerParams: &scorers.Config{},
})
scorer := peerStatuses.Scorers().BadResponsesScorer()
pid := peer.ID("peer1")
@@ -146,9 +148,9 @@ func TestPeerScorer_BadResponses_IsBadPeer(t *testing.T) {
peerStatuses.Add(nil, pid, nil, network.DirUnknown)
assert.Equal(t, false, scorer.IsBadPeer(pid))
for i := 0; i < peers.DefaultBadResponsesThreshold; i++ {
for i := 0; i < scorers.DefaultBadResponsesThreshold; i++ {
scorer.Increment(pid)
if i == peers.DefaultBadResponsesThreshold-1 {
if i == scorers.DefaultBadResponsesThreshold-1 {
assert.Equal(t, true, scorer.IsBadPeer(pid), "Unexpected peer status")
} else {
assert.Equal(t, false, scorer.IsBadPeer(pid), "Unexpected peer status")
@@ -156,20 +158,20 @@ func TestPeerScorer_BadResponses_IsBadPeer(t *testing.T) {
}
}
func TestPeerScorer_BadResponses_BadPeers(t *testing.T) {
func TestScorers_BadResponses_BadPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{},
ScorerParams: &scorers.Config{},
})
scorer := peerStatuses.Scorers().BadResponsesScorer()
pids := []peer.ID{peer.ID("peer1"), peer.ID("peer2"), peer.ID("peer3"), peer.ID("peer4"), peer.ID("peer5")}
for i := 0; i < len(pids); i++ {
peerStatuses.Add(nil, pids[i], nil, network.DirUnknown)
}
for i := 0; i < peers.DefaultBadResponsesThreshold; i++ {
for i := 0; i < scorers.DefaultBadResponsesThreshold; i++ {
scorer.Increment(pids[1])
scorer.Increment(pids[2])
scorer.Increment(pids[4])

View File

@@ -1,4 +1,4 @@
package peers
package scorers
import (
"context"
@@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/timeutils"
@@ -36,7 +37,7 @@ const (
type BlockProviderScorer struct {
ctx context.Context
config *BlockProviderScorerConfig
store *peerDataStore
store *peerdata.Store
// maxScore is a cached value for maximum attainable block provider score.
// It is calculated, on startup, as following: (processedBlocksCap / batchSize) * batchWeight.
maxScore float64
@@ -62,7 +63,7 @@ type BlockProviderScorerConfig struct {
// newBlockProviderScorer creates block provider scoring service.
func newBlockProviderScorer(
ctx context.Context, store *peerDataStore, config *BlockProviderScorerConfig) *BlockProviderScorer {
ctx context.Context, store *peerdata.Store, config *BlockProviderScorerConfig) *BlockProviderScorer {
if config == nil {
config = &BlockProviderScorerConfig{}
}
@@ -106,14 +107,14 @@ func (s *BlockProviderScorer) Score(pid peer.ID) float64 {
// score is a lock-free version of Score.
func (s *BlockProviderScorer) score(pid peer.ID) float64 {
score := float64(0)
peerData, ok := s.store.peers[pid]
peerData, ok := s.store.PeerData(pid)
// Boost score of new peers or peers that haven't been accessed for too long.
if !ok || time.Since(peerData.blockProviderUpdated) >= s.config.StalePeerRefreshInterval {
if !ok || time.Since(peerData.BlockProviderUpdated) >= s.config.StalePeerRefreshInterval {
return s.maxScore
}
batchSize := uint64(flags.Get().BlockBatchLimit)
if batchSize > 0 {
processedBatches := float64(peerData.processedBlocks / batchSize)
processedBatches := float64(peerData.ProcessedBlocks / batchSize)
score += processedBatches * s.config.ProcessedBatchWeight
}
return math.Round(score*ScoreRoundingFactor) / ScoreRoundingFactor
@@ -133,14 +134,12 @@ func (s *BlockProviderScorer) IncrementProcessedBlocks(pid peer.ID, cnt uint64)
if cnt <= 0 {
return
}
if _, ok := s.store.peers[pid]; !ok {
s.store.peers[pid] = &peerData{}
}
if s.store.peers[pid].processedBlocks+cnt > s.config.ProcessedBlocksCap {
cnt = s.config.ProcessedBlocksCap - s.store.peers[pid].processedBlocks
peerData := s.store.PeerDataGetOrCreate(pid)
if peerData.ProcessedBlocks+cnt > s.config.ProcessedBlocksCap {
cnt = s.config.ProcessedBlocksCap - peerData.ProcessedBlocks
}
if cnt > 0 {
s.store.peers[pid].processedBlocks += cnt
peerData.ProcessedBlocks += cnt
}
}
@@ -154,13 +153,11 @@ func (s *BlockProviderScorer) Touch(pid peer.ID, t ...time.Time) {
// touch is a lock-free version of Touch.
func (s *BlockProviderScorer) touch(pid peer.ID, t ...time.Time) {
if _, ok := s.store.peers[pid]; !ok {
s.store.peers[pid] = &peerData{}
}
peerData := s.store.PeerDataGetOrCreate(pid)
if len(t) == 1 {
s.store.peers[pid].blockProviderUpdated = t[0]
peerData.BlockProviderUpdated = t[0]
} else {
s.store.peers[pid].blockProviderUpdated = timeutils.Now()
peerData.BlockProviderUpdated = timeutils.Now()
}
}
@@ -173,8 +170,8 @@ func (s *BlockProviderScorer) ProcessedBlocks(pid peer.ID) uint64 {
// processedBlocks is a lock-free version of ProcessedBlocks.
func (s *BlockProviderScorer) processedBlocks(pid peer.ID) uint64 {
if peerData, ok := s.store.peers[pid]; ok {
return peerData.processedBlocks
if peerData, ok := s.store.PeerData(pid); ok {
return peerData.ProcessedBlocks
}
return 0
}
@@ -186,11 +183,11 @@ func (s *BlockProviderScorer) Decay() {
s.store.Lock()
defer s.store.Unlock()
for _, peerData := range s.store.peers {
if peerData.processedBlocks > s.config.Decay {
peerData.processedBlocks -= s.config.Decay
for _, peerData := range s.store.Peers() {
if peerData.ProcessedBlocks > s.config.Decay {
peerData.ProcessedBlocks -= s.config.Decay
} else {
peerData.processedBlocks = 0
peerData.ProcessedBlocks = 0
}
}
}

View File

@@ -1,4 +1,4 @@
package peers_test
package scorers_test
import (
"context"
@@ -10,106 +10,107 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/timeutils"
)
func TestPeerScorer_BlockProvider_Score(t *testing.T) {
func TestScorers_BlockProvider_Score(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
batchSize := uint64(flags.Get().BlockBatchLimit)
tests := []struct {
name string
update func(scorer *peers.BlockProviderScorer)
check func(scorer *peers.BlockProviderScorer)
update func(scorer *scorers.BlockProviderScorer)
check func(scorer *scorers.BlockProviderScorer)
}{
{
name: "nonexistent peer",
check: func(scorer *peers.BlockProviderScorer) {
check: func(scorer *scorers.BlockProviderScorer) {
assert.Equal(t, scorer.MaxScore(), scorer.Score("peer1"), "Unexpected score")
},
},
{
name: "existent peer with zero score",
update: func(scorer *peers.BlockProviderScorer) {
update: func(scorer *scorers.BlockProviderScorer) {
scorer.Touch("peer1")
},
check: func(scorer *peers.BlockProviderScorer) {
check: func(scorer *scorers.BlockProviderScorer) {
assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score")
},
},
{
name: "existent peer via increment",
update: func(scorer *peers.BlockProviderScorer) {
update: func(scorer *scorers.BlockProviderScorer) {
scorer.IncrementProcessedBlocks("peer1", 0)
},
check: func(scorer *peers.BlockProviderScorer) {
check: func(scorer *scorers.BlockProviderScorer) {
assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score")
},
},
{
name: "boost score of stale peer",
update: func(scorer *peers.BlockProviderScorer) {
update: func(scorer *scorers.BlockProviderScorer) {
batchWeight := scorer.Params().ProcessedBatchWeight
scorer.IncrementProcessedBlocks("peer1", batchSize*3)
assert.Equal(t, roundScore(batchWeight*3), scorer.Score("peer1"), "Unexpected score")
scorer.Touch("peer1", timeutils.Now().Add(-1*scorer.Params().StalePeerRefreshInterval))
},
check: func(scorer *peers.BlockProviderScorer) {
check: func(scorer *scorers.BlockProviderScorer) {
assert.Equal(t, scorer.MaxScore(), scorer.Score("peer1"), "Unexpected score")
},
},
{
name: "increment with 0 score",
update: func(scorer *peers.BlockProviderScorer) {
update: func(scorer *scorers.BlockProviderScorer) {
// Increment to zero (provider is added to cache but score is unchanged).
scorer.IncrementProcessedBlocks("peer1", 0)
},
check: func(scorer *peers.BlockProviderScorer) {
check: func(scorer *scorers.BlockProviderScorer) {
assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score")
},
},
{
name: "partial score",
update: func(scorer *peers.BlockProviderScorer) {
update: func(scorer *scorers.BlockProviderScorer) {
// Partial score (less than a single batch of blocks processed).
scorer.IncrementProcessedBlocks("peer1", batchSize/2)
},
check: func(scorer *peers.BlockProviderScorer) {
check: func(scorer *scorers.BlockProviderScorer) {
assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score")
},
},
{
name: "single batch",
update: func(scorer *peers.BlockProviderScorer) {
update: func(scorer *scorers.BlockProviderScorer) {
scorer.IncrementProcessedBlocks("peer1", batchSize)
},
check: func(scorer *peers.BlockProviderScorer) {
check: func(scorer *scorers.BlockProviderScorer) {
batchWeight := scorer.Params().ProcessedBatchWeight
assert.Equal(t, roundScore(batchWeight), scorer.Score("peer1"), "Unexpected score")
},
},
{
name: "multiple batches",
update: func(scorer *peers.BlockProviderScorer) {
update: func(scorer *scorers.BlockProviderScorer) {
scorer.IncrementProcessedBlocks("peer1", batchSize*7)
},
check: func(scorer *peers.BlockProviderScorer) {
check: func(scorer *scorers.BlockProviderScorer) {
batchWeight := scorer.Params().ProcessedBatchWeight
assert.Equal(t, roundScore(batchWeight*7), scorer.Score("peer1"), "Unexpected score")
},
},
{
name: "maximum score cap",
update: func(scorer *peers.BlockProviderScorer) {
update: func(scorer *scorers.BlockProviderScorer) {
batchWeight := scorer.Params().ProcessedBatchWeight
scorer.IncrementProcessedBlocks("peer1", batchSize*2)
assert.Equal(t, roundScore(batchWeight*2), scorer.Score("peer1"), "Unexpected score")
scorer.IncrementProcessedBlocks("peer1", scorer.Params().ProcessedBlocksCap)
},
check: func(scorer *peers.BlockProviderScorer) {
check: func(scorer *scorers.BlockProviderScorer) {
assert.Equal(t, scorer.Params().ProcessedBlocksCap, scorer.ProcessedBlocks("peer1"))
assert.Equal(t, 1.0, scorer.Score("peer1"))
},
@@ -120,8 +121,8 @@ func TestPeerScorer_BlockProvider_Score(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{},
ScorerParams: &scorers.Config{
BlockProviderScorerConfig: &scorers.BlockProviderScorerConfig{},
},
})
scorer := peerStatuses.Scorers().BlockProviderScorer()
@@ -133,12 +134,12 @@ func TestPeerScorer_BlockProvider_Score(t *testing.T) {
}
}
func TestPeerScorer_BlockProvider_GettersSetters(t *testing.T) {
func TestScorers_BlockProvider_GettersSetters(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &peers.PeerScorerConfig{},
ScorerParams: &scorers.Config{},
})
scorer := peerStatuses.Scorers().BlockProviderScorer()
@@ -147,12 +148,12 @@ func TestPeerScorer_BlockProvider_GettersSetters(t *testing.T) {
assert.Equal(t, uint64(64), scorer.ProcessedBlocks("peer1"))
}
func TestPeerScorer_BlockProvider_WeightSorted(t *testing.T) {
func TestScorers_BlockProvider_WeightSorted(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &peers.PeerScorerConfig{
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{
ScorerParams: &scorers.Config{
BlockProviderScorerConfig: &scorers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.01,
},
},
@@ -211,24 +212,24 @@ func TestPeerScorer_BlockProvider_WeightSorted(t *testing.T) {
assert.DeepEqual(t, reverse(pids), scoredPIDs, "Expected items with more weight to be picked more often")
}
func TestPeerScorer_BlockProvider_Sorted(t *testing.T) {
func TestScorers_BlockProvider_Sorted(t *testing.T) {
batchSize := uint64(flags.Get().BlockBatchLimit)
tests := []struct {
name string
update func(s *peers.BlockProviderScorer)
update func(s *scorers.BlockProviderScorer)
score func(pid peer.ID, score float64) float64
have []peer.ID
want []peer.ID
}{
{
name: "no peers",
update: func(s *peers.BlockProviderScorer) {},
update: func(s *scorers.BlockProviderScorer) {},
have: []peer.ID{},
want: []peer.ID{},
},
{
name: "same scores",
update: func(s *peers.BlockProviderScorer) {
update: func(s *scorers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", 16)
s.IncrementProcessedBlocks("peer2", 16)
s.IncrementProcessedBlocks("peer3", 16)
@@ -238,7 +239,7 @@ func TestPeerScorer_BlockProvider_Sorted(t *testing.T) {
},
{
name: "same scores multiple batches",
update: func(s *peers.BlockProviderScorer) {
update: func(s *scorers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*7+16)
s.IncrementProcessedBlocks("peer2", batchSize*7+16)
s.IncrementProcessedBlocks("peer3", batchSize*7+16)
@@ -248,7 +249,7 @@ func TestPeerScorer_BlockProvider_Sorted(t *testing.T) {
},
{
name: "same scores multiple batches unequal blocks",
update: func(s *peers.BlockProviderScorer) {
update: func(s *scorers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*7+6)
s.IncrementProcessedBlocks("peer2", batchSize*7+16)
s.IncrementProcessedBlocks("peer3", batchSize*7+26)
@@ -258,7 +259,7 @@ func TestPeerScorer_BlockProvider_Sorted(t *testing.T) {
},
{
name: "different scores",
update: func(s *peers.BlockProviderScorer) {
update: func(s *scorers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*3)
s.IncrementProcessedBlocks("peer2", batchSize*1)
s.IncrementProcessedBlocks("peer3", batchSize*2)
@@ -268,7 +269,7 @@ func TestPeerScorer_BlockProvider_Sorted(t *testing.T) {
},
{
name: "custom scorer",
update: func(s *peers.BlockProviderScorer) {
update: func(s *scorers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*3)
s.IncrementProcessedBlocks("peer2", batchSize*1)
s.IncrementProcessedBlocks("peer3", batchSize*2)
@@ -291,8 +292,8 @@ func TestPeerScorer_BlockProvider_Sorted(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &peers.PeerScorerConfig{
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{
ScorerParams: &scorers.Config{
BlockProviderScorerConfig: &scorers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.2,
},
},
@@ -304,24 +305,24 @@ func TestPeerScorer_BlockProvider_Sorted(t *testing.T) {
}
}
func TestPeerScorer_BlockProvider_MaxScore(t *testing.T) {
func TestScorers_BlockProvider_MaxScore(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
batchSize := uint64(flags.Get().BlockBatchLimit)
tests := []struct {
name string
cfg *peers.BlockProviderScorerConfig
cfg *scorers.BlockProviderScorerConfig
want float64
}{
{
name: "default config",
cfg: &peers.BlockProviderScorerConfig{},
cfg: &scorers.BlockProviderScorerConfig{},
want: 1.0,
},
{
name: "custom config",
cfg: &peers.BlockProviderScorerConfig{
cfg: &scorers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.5,
ProcessedBlocksCap: batchSize * 300,
},
@@ -332,7 +333,7 @@ func TestPeerScorer_BlockProvider_MaxScore(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &peers.PeerScorerConfig{
ScorerParams: &scorers.Config{
BlockProviderScorerConfig: tt.cfg,
},
})
@@ -342,7 +343,7 @@ func TestPeerScorer_BlockProvider_MaxScore(t *testing.T) {
}
}
func TestPeerScorer_BlockProvider_FormatScorePretty(t *testing.T) {
func TestScorers_BlockProvider_FormatScorePretty(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
batchSize := uint64(flags.Get().BlockBatchLimit)
@@ -350,74 +351,74 @@ func TestPeerScorer_BlockProvider_FormatScorePretty(t *testing.T) {
tests := []struct {
name string
update func(s *peers.BlockProviderScorer)
check func(s *peers.BlockProviderScorer)
update func(s *scorers.BlockProviderScorer)
check func(s *scorers.BlockProviderScorer)
}{
{
name: "peer not registered",
update: nil,
check: func(s *peers.BlockProviderScorer) {
check: func(s *scorers.BlockProviderScorer) {
assert.Equal(t, fmt.Sprintf(format, 100.0, 1.0, 0), s.FormatScorePretty("peer1"))
},
},
{
name: "peer registered zero blocks",
update: func(s *peers.BlockProviderScorer) {
update: func(s *scorers.BlockProviderScorer) {
s.Touch("peer1")
},
check: func(s *peers.BlockProviderScorer) {
check: func(s *scorers.BlockProviderScorer) {
assert.Equal(t, fmt.Sprintf(format, 0.0, 0.0, 0), s.FormatScorePretty("peer1"))
},
},
{
name: "partial batch",
update: func(s *peers.BlockProviderScorer) {
update: func(s *scorers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize/4)
},
check: func(s *peers.BlockProviderScorer) {
check: func(s *scorers.BlockProviderScorer) {
assert.Equal(t, fmt.Sprintf(format, 0.0, 0.0, batchSize/4), s.FormatScorePretty("peer1"))
},
},
{
name: "single batch",
update: func(s *peers.BlockProviderScorer) {
update: func(s *scorers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize)
},
check: func(s *peers.BlockProviderScorer) {
check: func(s *scorers.BlockProviderScorer) {
assert.Equal(t, fmt.Sprintf(format, 5.0, 0.05, batchSize), s.FormatScorePretty("peer1"))
},
},
{
name: "3/2 of a batch",
update: func(s *peers.BlockProviderScorer) {
update: func(s *scorers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*3/2)
},
check: func(s *peers.BlockProviderScorer) {
check: func(s *scorers.BlockProviderScorer) {
assert.Equal(t, fmt.Sprintf(format, 5.0, 0.05, batchSize*3/2), s.FormatScorePretty("peer1"))
},
},
{
name: "multiple batches",
update: func(s *peers.BlockProviderScorer) {
update: func(s *scorers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*5)
},
check: func(s *peers.BlockProviderScorer) {
check: func(s *scorers.BlockProviderScorer) {
assert.Equal(t, fmt.Sprintf(format, 25.0, 0.05*5, batchSize*5), s.FormatScorePretty("peer1"))
},
},
{
name: "multiple batches max score",
update: func(s *peers.BlockProviderScorer) {
update: func(s *scorers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", s.Params().ProcessedBlocksCap*5)
},
check: func(s *peers.BlockProviderScorer) {
check: func(s *scorers.BlockProviderScorer) {
want := fmt.Sprintf(format, 100.0, 1.0, s.Params().ProcessedBlocksCap)
assert.Equal(t, want, s.FormatScorePretty("peer1"))
},
},
{
name: "decaying",
update: func(s *peers.BlockProviderScorer) {
update: func(s *scorers.BlockProviderScorer) {
s.IncrementProcessedBlocks("peer1", batchSize*5)
s.IncrementProcessedBlocks("peer1", batchSize)
s.IncrementProcessedBlocks("peer1", batchSize/4)
@@ -430,7 +431,7 @@ func TestPeerScorer_BlockProvider_FormatScorePretty(t *testing.T) {
// Half of blocks is to be decayed.
s.Decay()
},
check: func(s *peers.BlockProviderScorer) {
check: func(s *scorers.BlockProviderScorer) {
want := fmt.Sprintf(format, 50.0, 0.5, s.Params().ProcessedBlocksCap/2)
assert.Equal(t, want, s.FormatScorePretty("peer1"))
},
@@ -440,8 +441,8 @@ func TestPeerScorer_BlockProvider_FormatScorePretty(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &peers.PeerScorerConfig{
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{
ScorerParams: &scorers.Config{
BlockProviderScorerConfig: &scorers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.05,
ProcessedBlocksCap: 20 * batchSize,
Decay: 10 * batchSize,

View File

@@ -0,0 +1,42 @@
package scorers_test
import (
"io/ioutil"
"math"
"os"
"testing"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/sirupsen/logrus"
)
func TestMain(m *testing.M) {
logrus.SetLevel(logrus.DebugLevel)
logrus.SetOutput(ioutil.Discard)
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{
EnablePeerScorer: true,
})
defer resetCfg()
resetFlags := flags.Get()
flags.Init(&flags.GlobalFlags{
BlockBatchLimit: 64,
BlockBatchLimitBurstFactor: 10,
})
defer func() {
flags.Init(resetFlags)
}()
code := m.Run()
// os.Exit will prevent defer from being called
resetCfg()
flags.Init(resetFlags)
os.Exit(code)
}
// roundScore returns score rounded in accordance with the score manager's rounding factor.
func roundScore(score float64) float64 {
return math.Round(score*scorers.ScoreRoundingFactor) / scorers.ScoreRoundingFactor
}

View File

@@ -0,0 +1,86 @@
package scorers
import (
"context"
"math"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
)
// ScoreRoundingFactor defines how many digits to keep in decimal part.
// This parameter is used in math.Round(score*ScoreRoundingFactor) / ScoreRoundingFactor.
const ScoreRoundingFactor = 10000
// Service manages peer scorers that are used to calculate overall peer score.
type Service struct {
ctx context.Context
store *peerdata.Store
scorers struct {
badResponsesScorer *BadResponsesScorer
blockProviderScorer *BlockProviderScorer
}
}
// Config holds configuration parameters for scoring service.
type Config struct {
BadResponsesScorerConfig *BadResponsesScorerConfig
BlockProviderScorerConfig *BlockProviderScorerConfig
}
// NewService provides fully initialized peer scoring service.
func NewService(ctx context.Context, store *peerdata.Store, config *Config) *Service {
s := &Service{
ctx: ctx,
store: store,
}
s.scorers.badResponsesScorer = newBadResponsesScorer(ctx, store, config.BadResponsesScorerConfig)
s.scorers.blockProviderScorer = newBlockProviderScorer(ctx, store, config.BlockProviderScorerConfig)
go s.loop(s.ctx)
return s
}
// BadResponsesScorer exposes bad responses scoring service.
func (s *Service) BadResponsesScorer() *BadResponsesScorer {
return s.scorers.badResponsesScorer
}
// BlockProviderScorer exposes block provider scoring service.
func (s *Service) BlockProviderScorer() *BlockProviderScorer {
return s.scorers.blockProviderScorer
}
// Score returns calculated peer score across all tracked metrics.
func (s *Service) Score(pid peer.ID) float64 {
s.store.RLock()
defer s.store.RUnlock()
score := float64(0)
if _, ok := s.store.PeerData(pid); !ok {
return 0
}
score += s.scorers.badResponsesScorer.score(pid)
score += s.scorers.blockProviderScorer.score(pid)
return math.Round(score*ScoreRoundingFactor) / ScoreRoundingFactor
}
// loop handles background tasks.
func (s *Service) loop(ctx context.Context) {
decayBadResponsesStats := time.NewTicker(s.scorers.badResponsesScorer.Params().DecayInterval)
defer decayBadResponsesStats.Stop()
decayBlockProviderStats := time.NewTicker(s.scorers.blockProviderScorer.Params().DecayInterval)
defer decayBlockProviderStats.Stop()
for {
select {
case <-decayBadResponsesStats.C:
s.scorers.badResponsesScorer.Decay()
case <-decayBlockProviderStats.C:
s.scorers.blockProviderScorer.Decay()
case <-ctx.Done():
return
}
}
}

View File

@@ -1,4 +1,4 @@
package peers_test
package scorers_test
import (
"context"
@@ -9,10 +9,11 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
)
func TestPeerScorer_PeerScorerManager_Init(t *testing.T) {
func TestScorers_Service_Init(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -21,36 +22,36 @@ func TestPeerScorer_PeerScorerManager_Init(t *testing.T) {
t.Run("default config", func(t *testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{},
ScorerParams: &scorers.Config{},
})
t.Run("bad responses scorer", func(t *testing.T) {
params := peerStatuses.Scorers().BadResponsesScorer().Params()
assert.Equal(t, peers.DefaultBadResponsesThreshold, params.Threshold, "Unexpected threshold value")
assert.Equal(t, peers.DefaultBadResponsesWeight, params.Weight, "Unexpected weight value")
assert.Equal(t, peers.DefaultBadResponsesDecayInterval, params.DecayInterval, "Unexpected decay interval value")
assert.Equal(t, scorers.DefaultBadResponsesThreshold, params.Threshold, "Unexpected threshold value")
assert.Equal(t, scorers.DefaultBadResponsesWeight, params.Weight, "Unexpected weight value")
assert.Equal(t, scorers.DefaultBadResponsesDecayInterval, params.DecayInterval, "Unexpected decay interval value")
})
t.Run("block providers scorer", func(t *testing.T) {
params := peerStatuses.Scorers().BlockProviderScorer().Params()
assert.Equal(t, peers.DefaultBlockProviderProcessedBatchWeight, params.ProcessedBatchWeight)
assert.Equal(t, peers.DefaultBlockProviderProcessedBlocksCap, params.ProcessedBlocksCap)
assert.Equal(t, peers.DefaultBlockProviderDecayInterval, params.DecayInterval)
assert.Equal(t, peers.DefaultBlockProviderDecay, params.Decay)
assert.Equal(t, peers.DefaultBlockProviderStalePeerRefreshInterval, params.StalePeerRefreshInterval)
assert.Equal(t, scorers.DefaultBlockProviderProcessedBatchWeight, params.ProcessedBatchWeight)
assert.Equal(t, scorers.DefaultBlockProviderProcessedBlocksCap, params.ProcessedBlocksCap)
assert.Equal(t, scorers.DefaultBlockProviderDecayInterval, params.DecayInterval)
assert.Equal(t, scorers.DefaultBlockProviderDecay, params.Decay)
assert.Equal(t, scorers.DefaultBlockProviderStalePeerRefreshInterval, params.StalePeerRefreshInterval)
})
})
t.Run("explicit config", func(t *testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 2,
Weight: -1,
DecayInterval: 1 * time.Minute,
},
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{
BlockProviderScorerConfig: &scorers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.2,
ProcessedBlocksCap: batchSize * 5,
DecayInterval: 1 * time.Minute,
@@ -79,13 +80,13 @@ func TestPeerScorer_PeerScorerManager_Init(t *testing.T) {
})
}
func TestPeerScorer_PeerScorerManager_Score(t *testing.T) {
func TestScorers_Service_Score(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
batchSize := uint64(flags.Get().BlockBatchLimit)
peerScores := func(s *peers.PeerScorerManager, pids []peer.ID) map[string]float64 {
peerScores := func(s *scorers.Service, pids []peer.ID) map[string]float64 {
scores := make(map[string]float64, len(pids))
for _, pid := range pids {
scores[string(pid)] = s.Score(pid)
@@ -93,7 +94,7 @@ func TestPeerScorer_PeerScorerManager_Score(t *testing.T) {
return scores
}
pack := func(scorer *peers.PeerScorerManager, s1, s2, s3 float64) map[string]float64 {
pack := func(scorer *scorers.Service, s1, s2, s3 float64) map[string]float64 {
return map[string]float64{
"peer1": roundScore(s1),
"peer2": roundScore(s2),
@@ -101,14 +102,14 @@ func TestPeerScorer_PeerScorerManager_Score(t *testing.T) {
}
}
setupScorer := func() (*peers.PeerScorerManager, []peer.ID) {
setupScorer := func() (*scorers.Service, []peer.ID) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 5,
},
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{
BlockProviderScorerConfig: &scorers.BlockProviderScorerConfig{
Decay: 64,
},
},
@@ -125,7 +126,7 @@ func TestPeerScorer_PeerScorerManager_Score(t *testing.T) {
t.Run("no peer registered", func(t *testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &peers.PeerScorerConfig{},
ScorerParams: &scorers.Config{},
})
s := peerStatuses.Scorers()
assert.Equal(t, 0.0, s.BadResponsesScorer().Score("peer1"))
@@ -208,19 +209,19 @@ func TestPeerScorer_PeerScorerManager_Score(t *testing.T) {
})
}
func TestPeerScorer_PeerScorerManager_loop(t *testing.T) {
func TestScorers_Service_loop(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 5,
Weight: -0.5,
DecayInterval: 50 * time.Millisecond,
},
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{
BlockProviderScorerConfig: &scorers.BlockProviderScorerConfig{
DecayInterval: 25 * time.Millisecond,
Decay: 64,
},

View File

@@ -1,7 +1,9 @@
// Package peers provides information about peers at the eth2 protocol level.
// "Protocol level" is the level above the network level, so this layer never sees or interacts with (for example) hosts that are
// uncontactable due to being down, firewalled, etc. Instead, this works with peers that are contactable but may or may not be of
// the correct fork version, not currently required due to the number of current connections, etc.
//
// "Protocol level" is the level above the network level, so this layer never sees or interacts with
// (for example) hosts that are uncontactable due to being down, firewalled, etc. Instead, this works
// with peers that are contactable but may or may not be of the correct fork version, not currently
// required due to the number of current connections, etc.
//
// A peer can have one of a number of states:
//
@@ -15,13 +17,13 @@
// - active if we are connecting or connected
// - inactive if we are disconnecting or disconnected
//
// Peer information is persistent for the run of the service. This allows for collection of useful long-term statistics such as
// number of bad responses obtained from the peer, giving the basis for decisions to not talk to known-bad peers.
// Peer information is persistent for the run of the service. This allows for collection of useful
// long-term statistics such as number of bad responses obtained from the peer, giving the basis for
// decisions to not talk to known-bad peers (by de-scoring them).
package peers
import (
"context"
"errors"
"sort"
"time"
@@ -30,20 +32,18 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/timeutils"
)
// PeerConnectionState is the state of the connection.
type PeerConnectionState ethpb.ConnectionState
const (
// PeerDisconnected means there is no connection to the peer.
PeerDisconnected PeerConnectionState = iota
PeerDisconnected peerdata.PeerConnectionState = iota
// PeerDisconnecting means there is an on-going attempt to disconnect from the peer.
PeerDisconnecting
// PeerConnected means the peer has an active connection.
@@ -55,16 +55,11 @@ const (
// Additional buffer beyond current peer limit, from which we can store the relevant peer statuses.
const maxLimitBuffer = 150
var (
// ErrPeerUnknown is returned when there is an attempt to obtain data from a peer that is not known.
ErrPeerUnknown = errors.New("peer unknown")
)
// Status is the structure holding the peer status information.
type Status struct {
ctx context.Context
scorers *PeerScorerManager
store *peerDataStore
scorers *scorers.Service
store *peerdata.Store
}
// StatusConfig represents peer status service params.
@@ -72,29 +67,29 @@ type StatusConfig struct {
// PeerLimit specifies maximum amount of concurrent peers that are expected to be connect to the node.
PeerLimit int
// ScorerParams holds peer scorer configuration params.
ScorerParams *PeerScorerConfig
ScorerParams *scorers.Config
}
// NewStatus creates a new status entity.
func NewStatus(ctx context.Context, config *StatusConfig) *Status {
store := newPeerDataStore(ctx, &peerDataStoreConfig{
maxPeers: maxLimitBuffer + config.PeerLimit,
store := peerdata.NewStore(ctx, &peerdata.StoreConfig{
MaxPeers: maxLimitBuffer + config.PeerLimit,
})
return &Status{
ctx: ctx,
store: store,
scorers: newPeerScorerManager(ctx, store, config.ScorerParams),
scorers: scorers.NewService(ctx, store, config.ScorerParams),
}
}
// Scorers exposes peer scoring management service.
func (p *Status) Scorers() *PeerScorerManager {
func (p *Status) Scorers() *scorers.Service {
return p.scorers
}
// MaxPeerLimit returns the max peer limit stored in the current peer store.
func (p *Status) MaxPeerLimit() int {
return p.store.config.maxPeers
return p.store.Config().MaxPeers
}
// Add adds a peer.
@@ -103,25 +98,25 @@ func (p *Status) Add(record *enr.Record, pid peer.ID, address ma.Multiaddr, dire
p.store.Lock()
defer p.store.Unlock()
if peerData, ok := p.store.peers[pid]; ok {
if peerData, ok := p.store.PeerData(pid); ok {
// Peer already exists, just update its address info.
peerData.address = address
peerData.direction = direction
peerData.Address = address
peerData.Direction = direction
if record != nil {
peerData.enr = record
peerData.Enr = record
}
return
}
peerData := &peerData{
address: address,
direction: direction,
peerData := &peerdata.PeerData{
Address: address,
Direction: direction,
// Peers start disconnected; state will be updated when the handshake process begins.
connState: PeerDisconnected,
ConnState: PeerDisconnected,
}
if record != nil {
peerData.enr = record
peerData.Enr = record
}
p.store.peers[pid] = peerData
p.store.SetPeerData(pid, peerData)
}
// Address returns the multiaddress of the given remote peer.
@@ -130,10 +125,10 @@ func (p *Status) Address(pid peer.ID) (ma.Multiaddr, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.peers[pid]; ok {
return peerData.address, nil
if peerData, ok := p.store.PeerData(pid); ok {
return peerData.Address, nil
}
return nil, ErrPeerUnknown
return nil, peerdata.ErrPeerUnknown
}
// Direction returns the direction of the given remote peer.
@@ -142,10 +137,10 @@ func (p *Status) Direction(pid peer.ID) (network.Direction, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.peers[pid]; ok {
return peerData.direction, nil
if peerData, ok := p.store.PeerData(pid); ok {
return peerData.Direction, nil
}
return network.DirUnknown, ErrPeerUnknown
return network.DirUnknown, peerdata.ErrPeerUnknown
}
// ENR returns the enr for the corresponding peer id.
@@ -153,10 +148,10 @@ func (p *Status) ENR(pid peer.ID) (*enr.Record, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.peers[pid]; ok {
return peerData.enr, nil
if peerData, ok := p.store.PeerData(pid); ok {
return peerData.Enr, nil
}
return nil, ErrPeerUnknown
return nil, peerdata.ErrPeerUnknown
}
// SetChainState sets the chain state of the given remote peer.
@@ -164,9 +159,9 @@ func (p *Status) SetChainState(pid peer.ID, chainState *pb.Status) {
p.store.Lock()
defer p.store.Unlock()
peerData := p.fetch(pid)
peerData.chainState = chainState
peerData.chainStateLastUpdated = timeutils.Now()
peerData := p.store.PeerDataGetOrCreate(pid)
peerData.ChainState = chainState
peerData.ChainStateLastUpdated = timeutils.Now()
}
// ChainState gets the chain state of the given remote peer.
@@ -176,10 +171,10 @@ func (p *Status) ChainState(pid peer.ID) (*pb.Status, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.peers[pid]; ok {
return peerData.chainState, nil
if peerData, ok := p.store.PeerData(pid); ok {
return peerData.ChainState, nil
}
return nil, ErrPeerUnknown
return nil, peerdata.ErrPeerUnknown
}
// IsActive checks if a peers is active and returns the result appropriately.
@@ -187,8 +182,8 @@ func (p *Status) IsActive(pid peer.ID) bool {
p.store.RLock()
defer p.store.RUnlock()
peerData, ok := p.store.peers[pid]
return ok && (peerData.connState == PeerConnected || peerData.connState == PeerConnecting)
peerData, ok := p.store.PeerData(pid)
return ok && (peerData.ConnState == PeerConnected || peerData.ConnState == PeerConnecting)
}
// SetMetadata sets the metadata of the given remote peer.
@@ -196,8 +191,8 @@ func (p *Status) SetMetadata(pid peer.ID, metaData *pb.MetaData) {
p.store.Lock()
defer p.store.Unlock()
peerData := p.fetch(pid)
peerData.metaData = metaData
peerData := p.store.PeerDataGetOrCreate(pid)
peerData.MetaData = metaData
}
// Metadata returns a copy of the metadata corresponding to the provided
@@ -206,10 +201,10 @@ func (p *Status) Metadata(pid peer.ID) (*pb.MetaData, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.peers[pid]; ok {
return proto.Clone(peerData.metaData).(*pb.MetaData), nil
if peerData, ok := p.store.PeerData(pid); ok {
return proto.Clone(peerData.MetaData).(*pb.MetaData), nil
}
return nil, ErrPeerUnknown
return nil, peerdata.ErrPeerUnknown
}
// CommitteeIndices retrieves the committee subnets the peer is subscribed to.
@@ -217,13 +212,13 @@ func (p *Status) CommitteeIndices(pid peer.ID) ([]uint64, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.peers[pid]; ok {
if peerData.enr == nil || peerData.metaData == nil {
if peerData, ok := p.store.PeerData(pid); ok {
if peerData.Enr == nil || peerData.MetaData == nil {
return []uint64{}, nil
}
return retrieveIndicesFromBitfield(peerData.metaData.Attnets), nil
return retrieveIndicesFromBitfield(peerData.MetaData.Attnets), nil
}
return nil, ErrPeerUnknown
return nil, peerdata.ErrPeerUnknown
}
// SubscribedToSubnet retrieves the peers subscribed to the given
@@ -233,11 +228,11 @@ func (p *Status) SubscribedToSubnet(index uint64) []peer.ID {
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.peers {
for pid, peerData := range p.store.Peers() {
// look at active peers
connectedStatus := peerData.connState == PeerConnecting || peerData.connState == PeerConnected
if connectedStatus && peerData.metaData != nil && peerData.metaData.Attnets != nil {
indices := retrieveIndicesFromBitfield(peerData.metaData.Attnets)
connectedStatus := peerData.ConnState == PeerConnecting || peerData.ConnState == PeerConnected
if connectedStatus && peerData.MetaData != nil && peerData.MetaData.Attnets != nil {
indices := retrieveIndicesFromBitfield(peerData.MetaData.Attnets)
for _, idx := range indices {
if idx == index {
peers = append(peers, pid)
@@ -250,24 +245,24 @@ func (p *Status) SubscribedToSubnet(index uint64) []peer.ID {
}
// SetConnectionState sets the connection state of the given remote peer.
func (p *Status) SetConnectionState(pid peer.ID, state PeerConnectionState) {
func (p *Status) SetConnectionState(pid peer.ID, state peerdata.PeerConnectionState) {
p.store.Lock()
defer p.store.Unlock()
peerData := p.fetch(pid)
peerData.connState = state
peerData := p.store.PeerDataGetOrCreate(pid)
peerData.ConnState = state
}
// ConnectionState gets the connection state of the given remote peer.
// This will error if the peer does not exist.
func (p *Status) ConnectionState(pid peer.ID) (PeerConnectionState, error) {
func (p *Status) ConnectionState(pid peer.ID) (peerdata.PeerConnectionState, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.peers[pid]; ok {
return peerData.connState, nil
if peerData, ok := p.store.PeerData(pid); ok {
return peerData.ConnState, nil
}
return PeerDisconnected, ErrPeerUnknown
return PeerDisconnected, peerdata.ErrPeerUnknown
}
// ChainStateLastUpdated gets the last time the chain state of the given remote peer was updated.
@@ -276,10 +271,10 @@ func (p *Status) ChainStateLastUpdated(pid peer.ID) (time.Time, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.peers[pid]; ok {
return peerData.chainStateLastUpdated, nil
if peerData, ok := p.store.PeerData(pid); ok {
return peerData.ChainStateLastUpdated, nil
}
return timeutils.Now(), ErrPeerUnknown
return timeutils.Now(), peerdata.ErrPeerUnknown
}
// IsBad states if the peer is to be considered bad.
@@ -293,8 +288,8 @@ func (p *Status) Connecting() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.peers {
if peerData.connState == PeerConnecting {
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnecting {
peers = append(peers, pid)
}
}
@@ -306,8 +301,8 @@ func (p *Status) Connected() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.peers {
if peerData.connState == PeerConnected {
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected {
peers = append(peers, pid)
}
}
@@ -319,8 +314,8 @@ func (p *Status) Active() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.peers {
if peerData.connState == PeerConnecting || peerData.connState == PeerConnected {
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnecting || peerData.ConnState == PeerConnected {
peers = append(peers, pid)
}
}
@@ -332,8 +327,8 @@ func (p *Status) Disconnecting() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.peers {
if peerData.connState == PeerDisconnecting {
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerDisconnecting {
peers = append(peers, pid)
}
}
@@ -345,8 +340,8 @@ func (p *Status) Disconnected() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.peers {
if peerData.connState == PeerDisconnected {
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerDisconnected {
peers = append(peers, pid)
}
}
@@ -358,8 +353,8 @@ func (p *Status) Inactive() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.peers {
if peerData.connState == PeerDisconnecting || peerData.connState == PeerDisconnected {
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerDisconnecting || peerData.ConnState == PeerDisconnected {
peers = append(peers, pid)
}
}
@@ -375,8 +370,8 @@ func (p *Status) Bad() []peer.ID {
func (p *Status) All() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
pids := make([]peer.ID, 0, len(p.store.peers))
for pid := range p.store.peers {
pids := make([]peer.ID, 0, len(p.store.Peers()))
for pid := range p.store.Peers() {
pids = append(pids, pid)
}
return pids
@@ -388,21 +383,24 @@ func (p *Status) Prune() {
defer p.store.Unlock()
// Exit early if there is nothing to prune.
if len(p.store.peers) <= p.store.config.maxPeers {
if len(p.store.Peers()) <= p.store.Config().MaxPeers {
return
}
notBadPeer := func(peerData *peerdata.PeerData) bool {
return peerData.BadResponses < p.scorers.BadResponsesScorer().Params().Threshold
}
type peerResp struct {
pid peer.ID
badResp int
}
peersToPrune := make([]*peerResp, 0)
// Select disconnected peers with a smaller bad response count.
for pid, peerData := range p.store.peers {
if peerData.connState == PeerDisconnected && !p.scorers.BadResponsesScorer().isBadPeer(pid) {
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerDisconnected && notBadPeer(peerData) {
peersToPrune = append(peersToPrune, &peerResp{
pid: pid,
badResp: p.store.peers[pid].badResponses,
badResp: peerData.BadResponses,
})
}
}
@@ -415,7 +413,7 @@ func (p *Status) Prune() {
return peersToPrune[i].badResp < peersToPrune[j].badResp
})
limitDiff := len(p.store.peers) - p.store.config.maxPeers
limitDiff := len(p.store.Peers()) - p.store.Config().MaxPeers
if limitDiff > len(peersToPrune) {
limitDiff = len(peersToPrune)
}
@@ -424,7 +422,7 @@ func (p *Status) Prune() {
// Delete peers from map.
for _, peerData := range peersToPrune {
delete(p.store.peers, peerData.pid)
p.store.DeletePeerData(peerData.pid)
}
}
@@ -526,22 +524,14 @@ func (p *Status) BestNonFinalized(minPeers int, ourFinalizedEpoch uint64) (uint6
return targetEpoch, potentialPIDs
}
// fetch is a helper function that fetches a peer status, possibly creating it.
func (p *Status) fetch(pid peer.ID) *peerData {
if _, ok := p.store.peers[pid]; !ok {
p.store.peers[pid] = &peerData{}
}
return p.store.peers[pid]
}
// HighestEpoch returns the highest epoch reported epoch amongst peers.
func (p *Status) HighestEpoch() uint64 {
p.store.RLock()
defer p.store.RUnlock()
var highestSlot uint64
for _, peerData := range p.store.peers {
if peerData != nil && peerData.chainState != nil && peerData.chainState.HeadSlot > highestSlot {
highestSlot = peerData.chainState.HeadSlot
for _, peerData := range p.store.Peers() {
if peerData != nil && peerData.ChainState != nil && peerData.ChainState.HeadSlot > highestSlot {
highestSlot = peerData.ChainState.HeadSlot
}
}
return helpers.SlotToEpoch(highestSlot)

View File

@@ -11,6 +11,8 @@ import (
ma "github.com/multiformats/go-multiaddr"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
@@ -21,8 +23,8 @@ func TestStatus(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -35,8 +37,8 @@ func TestPeerExplicitAdd(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -76,8 +78,8 @@ func TestPeerNoENR(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -100,8 +102,8 @@ func TestPeerNoOverwriteENR(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -127,8 +129,8 @@ func TestErrUnknownPeer(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -138,30 +140,30 @@ func TestErrUnknownPeer(t *testing.T) {
require.NoError(t, err)
_, err = p.Address(id)
assert.ErrorContains(t, peers.ErrPeerUnknown.Error(), err)
assert.ErrorContains(t, peerdata.ErrPeerUnknown.Error(), err)
_, err = p.Direction(id)
assert.ErrorContains(t, peers.ErrPeerUnknown.Error(), err)
assert.ErrorContains(t, peerdata.ErrPeerUnknown.Error(), err)
_, err = p.ChainState(id)
assert.ErrorContains(t, peers.ErrPeerUnknown.Error(), err)
assert.ErrorContains(t, peerdata.ErrPeerUnknown.Error(), err)
_, err = p.ConnectionState(id)
assert.ErrorContains(t, peers.ErrPeerUnknown.Error(), err)
assert.ErrorContains(t, peerdata.ErrPeerUnknown.Error(), err)
_, err = p.ChainStateLastUpdated(id)
assert.ErrorContains(t, peers.ErrPeerUnknown.Error(), err)
assert.ErrorContains(t, peerdata.ErrPeerUnknown.Error(), err)
_, err = p.Scorers().BadResponsesScorer().Count(id)
assert.ErrorContains(t, peers.ErrPeerUnknown.Error(), err)
assert.ErrorContains(t, peerdata.ErrPeerUnknown.Error(), err)
}
func TestPeerCommitteeIndices(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -197,8 +199,8 @@ func TestPeerSubscribedToSubnet(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -241,8 +243,8 @@ func TestPeerImplicitAdd(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -264,8 +266,8 @@ func TestPeerChainState(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -299,8 +301,8 @@ func TestPeerBadResponses(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -349,8 +351,8 @@ func TestAddMetaData(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -378,8 +380,8 @@ func TestPeerConnectionStatuses(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -420,8 +422,8 @@ func TestPrune(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -471,8 +473,8 @@ func TestPrune(t *testing.T) {
func TestTrimmedOrderedPeers(t *testing.T) {
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 1,
},
},
@@ -541,8 +543,8 @@ func TestBestPeer(t *testing.T) {
junkRoot := [32]byte{'j', 'u', 'n', 'k'}
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -593,8 +595,8 @@ func TestBestFinalized_returnsMaxValue(t *testing.T) {
maxPeers := 10
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -615,8 +617,8 @@ func TestBestFinalized_returnsMaxValue(t *testing.T) {
func TestStatus_BestNonFinalized(t *testing.T) {
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 2,
},
},
@@ -641,8 +643,8 @@ func TestStatus_CurrentEpoch(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
},
},
@@ -667,7 +669,7 @@ func TestStatus_CurrentEpoch(t *testing.T) {
}
// addPeer is a helper to add a peer with a given connection state)
func addPeer(t *testing.T, p *peers.Status, state peers.PeerConnectionState) peer.ID {
func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState) peer.ID {
// Set up some peers with different states
mhBytes := []byte{0x11, 0x04}
idBytes := make([]byte, 4)

View File

@@ -1,51 +0,0 @@
package peers
import (
"context"
"sync"
"time"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)
// peerDataStore is a container for various peer related data (both protocol and app level).
// Container implements RWMutex, so data access can be restricted on the container level. This allows
// different components rely on the very same peer map container.
type peerDataStore struct {
sync.RWMutex
ctx context.Context
config *peerDataStoreConfig
peers map[peer.ID]*peerData
}
// peerDataStoreConfig holds peer store parameters.
type peerDataStoreConfig struct {
maxPeers int
}
// peerData aggregates protocol and application level info about a single peer.
type peerData struct {
address ma.Multiaddr
direction network.Direction
connState PeerConnectionState
chainState *pb.Status
enr *enr.Record
metaData *pb.MetaData
chainStateLastUpdated time.Time
badResponses int
processedBlocks uint64
blockProviderUpdated time.Time
}
// newPeerDataStore creates peer store.
func newPeerDataStore(ctx context.Context, config *peerDataStoreConfig) *peerDataStore {
return &peerDataStore{
ctx: ctx,
config: config,
peers: make(map[peer.ID]*peerData),
}
}

View File

@@ -23,6 +23,7 @@ import (
filter "github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
"go.opencensus.io/trace"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
@@ -164,8 +165,8 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
s.peers = peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: int(s.cfg.MaxPeers),
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
Weight: -100,
DecayInterval: time.Hour,

View File

@@ -18,6 +18,7 @@ go_library(
deps = [
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",

View File

@@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
log "github.com/sirupsen/logrus"
)
@@ -28,8 +29,8 @@ func (m *MockPeersProvider) Peers() *peers.Status {
if m.peers == nil {
m.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 5,
},
},

View File

@@ -24,6 +24,7 @@ import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/sirupsen/logrus"
)
@@ -55,8 +56,8 @@ func NewTestP2P(t *testing.T) *TestP2P {
peerStatuses := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesScorerConfig: &peers.BadResponsesScorerConfig{
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 5,
},
},

View File

@@ -25,7 +25,7 @@ go_library(
"//beacon-chain/db:go_default_library",
"//beacon-chain/flags:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//beacon-chain/sync:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared:go_default_library",
@@ -113,6 +113,7 @@ go_test(
"//beacon-chain/flags:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/sync:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",

View File

@@ -10,7 +10,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
scorers "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/timeutils"

View File

@@ -19,7 +19,7 @@ import (
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
p2pm "github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
beaconsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
@@ -772,7 +772,7 @@ func TestBlocksFetcher_filterScoredPeers(t *testing.T) {
tests := []struct {
name string
args args
update func(s *peers.BlockProviderScorer)
update func(s *scorers.BlockProviderScorer)
want []peer.ID
}{
{
@@ -821,7 +821,7 @@ func TestBlocksFetcher_filterScoredPeers(t *testing.T) {
peersPercentage: 1.0,
capacityWeight: 0.2,
},
update: func(s *peers.BlockProviderScorer) {
update: func(s *scorers.BlockProviderScorer) {
s.IncrementProcessedBlocks("a", batchSize*2)
s.IncrementProcessedBlocks("b", batchSize*2)
s.IncrementProcessedBlocks("c", batchSize*2)
@@ -843,7 +843,7 @@ func TestBlocksFetcher_filterScoredPeers(t *testing.T) {
peersPercentage: 0.8,
capacityWeight: 0.2,
},
update: func(s *peers.BlockProviderScorer) {
update: func(s *scorers.BlockProviderScorer) {
s.IncrementProcessedBlocks("e", s.Params().ProcessedBlocksCap)
s.IncrementProcessedBlocks("b", s.Params().ProcessedBlocksCap/2)
s.IncrementProcessedBlocks("c", s.Params().ProcessedBlocksCap/4)
@@ -865,7 +865,7 @@ func TestBlocksFetcher_filterScoredPeers(t *testing.T) {
peersPercentage: 0.8,
capacityWeight: 0.2,
},
update: func(s *peers.BlockProviderScorer) {
update: func(s *scorers.BlockProviderScorer) {
// Make sure that score takes priority over capacity.
s.IncrementProcessedBlocks("c", batchSize*5)
s.IncrementProcessedBlocks("b", batchSize*15)