mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Use Scoring Parameters To Enable Global Score (#8794)
* fix score parameters * add check * fix deadlock issues * fix * fix again * gaz * comment * fix tests and victor's review * gaz * clean up * fix tests * fix tests * gate behind flag * add scorer to dev Co-authored-by: prestonvanloon <preston@prysmaticlabs.com> Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
@@ -9,6 +9,7 @@ go_library(
|
||||
"//beacon-chain/core:go_default_library",
|
||||
"//beacon-chain/p2p/peers/peerdata:go_default_library",
|
||||
"//beacon-chain/p2p/peers/scorers:go_default_library",
|
||||
"//config/features:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//crypto/rand:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
|
||||
@@ -15,6 +15,9 @@ const (
|
||||
// DefaultBadResponsesDecayInterval defines how often to decay previous statistics.
|
||||
// Every interval bad responses counter will be decremented by 1.
|
||||
DefaultBadResponsesDecayInterval = time.Hour
|
||||
// DefaultBadResponsesPenaltyFactor defines the penalty factor applied to a peer based on their bad
|
||||
// response count.
|
||||
DefaultBadResponsesPenaltyFactor = 10
|
||||
)
|
||||
|
||||
// BadResponsesScorer represents bad responses scoring service.
|
||||
@@ -68,8 +71,9 @@ func (s *BadResponsesScorer) score(pid peer.ID) float64 {
|
||||
}
|
||||
if peerData.BadResponses > 0 {
|
||||
score = float64(peerData.BadResponses) / float64(s.config.Threshold)
|
||||
// Since score represents a penalty, negate it.
|
||||
score *= -1
|
||||
// Since score represents a penalty, negate it and multiply
|
||||
// it by a factor.
|
||||
score *= -DefaultBadResponsesPenaltyFactor
|
||||
}
|
||||
return score
|
||||
}
|
||||
|
||||
@@ -30,12 +30,12 @@ func TestScorers_BadResponses_Score(t *testing.T) {
|
||||
|
||||
assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score for unregistered peer")
|
||||
scorer.Increment("peer1")
|
||||
assert.Equal(t, -0.25, scorer.Score("peer1"))
|
||||
assert.Equal(t, -2.5, scorer.Score("peer1"))
|
||||
scorer.Increment("peer1")
|
||||
assert.Equal(t, -0.5, scorer.Score("peer1"))
|
||||
assert.Equal(t, float64(-5), scorer.Score("peer1"))
|
||||
scorer.Increment("peer1")
|
||||
scorer.Increment("peer1")
|
||||
assert.Equal(t, -1.0, scorer.Score("peer1"))
|
||||
assert.Equal(t, -100.0, scorer.Score("peer1"))
|
||||
assert.Equal(t, true, scorer.IsBadPeer("peer1"))
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,11 @@ import (
|
||||
|
||||
var _ Scorer = (*GossipScorer)(nil)
|
||||
|
||||
const (
|
||||
// The boundary till which a peer's gossip score is acceptable.
|
||||
gossipThreshold = -100.0
|
||||
)
|
||||
|
||||
// GossipScorer represents scorer that evaluates peers based on their gossip performance.
|
||||
// Gossip scoring metrics are periodically calculated in libp2p's internal pubsub module.
|
||||
type GossipScorer struct {
|
||||
@@ -58,7 +63,7 @@ func (s *GossipScorer) isBadPeer(pid peer.ID) bool {
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return peerData.GossipScore < 0
|
||||
return peerData.GossipScore < gossipThreshold
|
||||
}
|
||||
|
||||
// BadPeers returns the peers that are considered bad.
|
||||
|
||||
@@ -30,10 +30,10 @@ func TestScorers_Gossip_Score(t *testing.T) {
|
||||
{
|
||||
name: "existent bad peer",
|
||||
update: func(scorer *scorers.GossipScorer) {
|
||||
scorer.SetGossipData("peer1", -10.0, 1, nil)
|
||||
scorer.SetGossipData("peer1", -101.0, 1, nil)
|
||||
},
|
||||
check: func(scorer *scorers.GossipScorer) {
|
||||
assert.Equal(t, -10.0, scorer.Score("peer1"), "Unexpected score")
|
||||
assert.Equal(t, -101.0, scorer.Score("peer1"), "Unexpected score")
|
||||
assert.Equal(t, true, scorer.IsBadPeer("peer1"), "Unexpected good peer")
|
||||
},
|
||||
},
|
||||
|
||||
@@ -82,6 +82,8 @@ func (s *PeerStatusScorer) isBadPeer(pid peer.ID) bool {
|
||||
// Mark peer as bad, if the latest error is one of the terminal ones.
|
||||
terminalErrs := []error{
|
||||
p2ptypes.ErrWrongForkDigestVersion,
|
||||
p2ptypes.ErrInvalidFinalizedRoot,
|
||||
p2ptypes.ErrInvalidRequest,
|
||||
}
|
||||
for _, err := range terminalErrs {
|
||||
if errors.Is(peerData.ChainStateValidationError, err) {
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
|
||||
"github.com/prysmaticlabs/prysm/config/features"
|
||||
)
|
||||
|
||||
var _ Scorer = (*Service)(nil)
|
||||
@@ -16,7 +17,9 @@ var _ Scorer = (*Service)(nil)
|
||||
const ScoreRoundingFactor = 10000
|
||||
|
||||
// BadPeerScore defines score that is returned for a bad peer (all other metrics are ignored).
|
||||
const BadPeerScore = -1.00
|
||||
// The bad peer score was decided to be based on our determined gossip threshold, so that
|
||||
// all the other scoring services have their relevant penalties on similar scales.
|
||||
const BadPeerScore = gossipThreshold
|
||||
|
||||
// Scorer defines minimum set of methods every peer scorer must expose.
|
||||
type Scorer interface {
|
||||
@@ -55,13 +58,13 @@ func NewService(ctx context.Context, store *peerdata.Store, config *Config) *Ser
|
||||
|
||||
// Register scorers.
|
||||
s.scorers.badResponsesScorer = newBadResponsesScorer(store, config.BadResponsesScorerConfig)
|
||||
s.setScorerWeight(s.scorers.badResponsesScorer, 1.0)
|
||||
s.setScorerWeight(s.scorers.badResponsesScorer, 0.3)
|
||||
s.scorers.blockProviderScorer = newBlockProviderScorer(store, config.BlockProviderScorerConfig)
|
||||
s.setScorerWeight(s.scorers.blockProviderScorer, 1.0)
|
||||
s.setScorerWeight(s.scorers.blockProviderScorer, 0.0)
|
||||
s.scorers.peerStatusScorer = newPeerStatusScorer(store, config.PeerStatusScorerConfig)
|
||||
s.setScorerWeight(s.scorers.peerStatusScorer, 0.0)
|
||||
s.setScorerWeight(s.scorers.peerStatusScorer, 0.3)
|
||||
s.scorers.gossipScorer = newGossipScorer(store, config.GossipScorerConfig)
|
||||
s.setScorerWeight(s.scorers.gossipScorer, 0.0)
|
||||
s.setScorerWeight(s.scorers.gossipScorer, 0.4)
|
||||
|
||||
// Start background tasks.
|
||||
go s.loop(ctx)
|
||||
@@ -104,7 +107,11 @@ func (s *Service) ActiveScorersCount() int {
|
||||
func (s *Service) Score(pid peer.ID) float64 {
|
||||
s.store.RLock()
|
||||
defer s.store.RUnlock()
|
||||
return s.ScoreNoLock(pid)
|
||||
}
|
||||
|
||||
// ScoreNoLock is a lock-free version of Score.
|
||||
func (s *Service) ScoreNoLock(pid peer.ID) float64 {
|
||||
score := float64(0)
|
||||
if _, ok := s.store.PeerData(pid); !ok {
|
||||
return 0
|
||||
@@ -120,19 +127,22 @@ func (s *Service) Score(pid peer.ID) float64 {
|
||||
func (s *Service) IsBadPeer(pid peer.ID) bool {
|
||||
s.store.RLock()
|
||||
defer s.store.RUnlock()
|
||||
return s.isBadPeer(pid)
|
||||
return s.IsBadPeerNoLock(pid)
|
||||
}
|
||||
|
||||
// isBadPeer is a lock-free version of isBadPeer.
|
||||
func (s *Service) isBadPeer(pid peer.ID) bool {
|
||||
// IsBadPeerNoLock is a lock-free version of IsBadPeer.
|
||||
func (s *Service) IsBadPeerNoLock(pid peer.ID) bool {
|
||||
if s.scorers.badResponsesScorer.isBadPeer(pid) {
|
||||
return true
|
||||
}
|
||||
if s.scorers.peerStatusScorer.isBadPeer(pid) {
|
||||
return true
|
||||
}
|
||||
// TODO(#6043): Hook in gossip scorer's relevant
|
||||
// method to check if peer has a bad gossip score.
|
||||
if features.Get().EnablePeerScorer {
|
||||
if s.scorers.gossipScorer.isBadPeer(pid) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -143,7 +153,7 @@ func (s *Service) BadPeers() []peer.ID {
|
||||
|
||||
badPeers := make([]peer.ID, 0)
|
||||
for pid := range s.store.Peers() {
|
||||
if s.isBadPeer(pid) {
|
||||
if s.IsBadPeerNoLock(pid) {
|
||||
badPeers = append(badPeers, pid)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,6 +92,14 @@ func TestScorers_Service_Score(t *testing.T) {
|
||||
return scores
|
||||
}
|
||||
|
||||
blkProviderScorers := func(s *scorers.Service, pids []peer.ID) map[string]float64 {
|
||||
scores := make(map[string]float64, len(pids))
|
||||
for _, pid := range pids {
|
||||
scores[string(pid)] = s.BlockProviderScorer().Score(pid)
|
||||
}
|
||||
return scores
|
||||
}
|
||||
|
||||
pack := func(scorer *scorers.Service, s1, s2, s3 float64) map[string]float64 {
|
||||
return map[string]float64{
|
||||
"peer1": roundScore(s1),
|
||||
@@ -116,9 +124,7 @@ func TestScorers_Service_Score(t *testing.T) {
|
||||
pids := []peer.ID{"peer1", "peer2", "peer3"}
|
||||
for _, pid := range pids {
|
||||
peerStatuses.Add(nil, pid, nil, network.DirUnknown)
|
||||
// Not yet used peer gets boosted score.
|
||||
startScore := s.BlockProviderScorer().MaxScore()
|
||||
assert.Equal(t, startScore/float64(s.ActiveScorersCount()), s.Score(pid), "Unexpected score for not yet used peer")
|
||||
assert.Equal(t, float64(0), s.Score(pid), "Unexpected score for not yet used peer")
|
||||
}
|
||||
return s, pids
|
||||
}
|
||||
@@ -136,8 +142,8 @@ func TestScorers_Service_Score(t *testing.T) {
|
||||
t.Run("bad responses score", func(t *testing.T) {
|
||||
s, pids := setupScorer()
|
||||
// Peers start with boosted start score (new peers are boosted by block provider).
|
||||
startScore := s.BlockProviderScorer().MaxScore() / float64(s.ActiveScorersCount())
|
||||
penalty := (-1 / float64(s.BadResponsesScorer().Params().Threshold)) / float64(s.ActiveScorersCount())
|
||||
startScore := float64(0)
|
||||
penalty := (-10 / float64(s.BadResponsesScorer().Params().Threshold)) * float64(0.3)
|
||||
|
||||
// Update peers' stats and test the effect on peer order.
|
||||
s.BadResponsesScorer().Increment("peer2")
|
||||
@@ -156,54 +162,53 @@ func TestScorers_Service_Score(t *testing.T) {
|
||||
t.Run("block providers score", func(t *testing.T) {
|
||||
s, pids := setupScorer()
|
||||
s1 := s.BlockProviderScorer()
|
||||
startScore := s.BlockProviderScorer().MaxScore() / 2
|
||||
batchWeight := s1.Params().ProcessedBatchWeight / 2
|
||||
startScore := s.BlockProviderScorer().MaxScore()
|
||||
batchWeight := s1.Params().ProcessedBatchWeight
|
||||
|
||||
// Partial batch.
|
||||
s1.IncrementProcessedBlocks("peer1", batchSize/4)
|
||||
assert.Equal(t, 0.0, s.Score("peer1"), "Unexpected %q score", "peer1")
|
||||
assert.Equal(t, 0.0, s.BlockProviderScorer().Score("peer1"), "Unexpected %q score", "peer1")
|
||||
|
||||
// Single batch.
|
||||
s1.IncrementProcessedBlocks("peer1", batchSize)
|
||||
assert.DeepEqual(t, pack(s, batchWeight, startScore, startScore), peerScores(s, pids), "Unexpected scores")
|
||||
assert.DeepEqual(t, pack(s, batchWeight, startScore, startScore), blkProviderScorers(s, pids), "Unexpected scores")
|
||||
|
||||
// Multiple batches.
|
||||
s1.IncrementProcessedBlocks("peer2", batchSize*4)
|
||||
assert.DeepEqual(t, pack(s, batchWeight, batchWeight*4, startScore), peerScores(s, pids), "Unexpected scores")
|
||||
assert.DeepEqual(t, pack(s, batchWeight, batchWeight*4, startScore), blkProviderScorers(s, pids), "Unexpected scores")
|
||||
|
||||
// Partial batch.
|
||||
s1.IncrementProcessedBlocks("peer3", batchSize/2)
|
||||
assert.DeepEqual(t, pack(s, batchWeight, batchWeight*4, 0), peerScores(s, pids), "Unexpected scores")
|
||||
assert.DeepEqual(t, pack(s, batchWeight, batchWeight*4, 0), blkProviderScorers(s, pids), "Unexpected scores")
|
||||
|
||||
// See effect of decaying.
|
||||
assert.Equal(t, batchSize+batchSize/4, s1.ProcessedBlocks("peer1"))
|
||||
assert.Equal(t, batchSize*4, s1.ProcessedBlocks("peer2"))
|
||||
assert.Equal(t, batchSize/2, s1.ProcessedBlocks("peer3"))
|
||||
assert.DeepEqual(t, pack(s, batchWeight, batchWeight*4, 0), peerScores(s, pids), "Unexpected scores")
|
||||
assert.DeepEqual(t, pack(s, batchWeight, batchWeight*4, 0), blkProviderScorers(s, pids), "Unexpected scores")
|
||||
s1.Decay()
|
||||
assert.Equal(t, batchSize/4, s1.ProcessedBlocks("peer1"))
|
||||
assert.Equal(t, batchSize*3, s1.ProcessedBlocks("peer2"))
|
||||
assert.Equal(t, uint64(0), s1.ProcessedBlocks("peer3"))
|
||||
assert.DeepEqual(t, pack(s, 0, batchWeight*3, 0), peerScores(s, pids), "Unexpected scores")
|
||||
assert.DeepEqual(t, pack(s, 0, batchWeight*3, 0), blkProviderScorers(s, pids), "Unexpected scores")
|
||||
})
|
||||
|
||||
t.Run("overall score", func(t *testing.T) {
|
||||
s, _ := setupScorer()
|
||||
s1 := s.BlockProviderScorer()
|
||||
s2 := s.BadResponsesScorer()
|
||||
batchWeight := s1.Params().ProcessedBatchWeight / float64(s.ActiveScorersCount())
|
||||
penalty := (-1 / float64(s.BadResponsesScorer().Params().Threshold)) / float64(s.ActiveScorersCount())
|
||||
penalty := (-10 / float64(s.BadResponsesScorer().Params().Threshold)) * float64(0.3)
|
||||
|
||||
// Full score, no penalty.
|
||||
s1.IncrementProcessedBlocks("peer1", batchSize*5)
|
||||
assert.Equal(t, roundScore(batchWeight*5), s.Score("peer1"))
|
||||
assert.Equal(t, float64(0), s.Score("peer1"))
|
||||
// Now, adjust score by introducing penalty for bad responses.
|
||||
s2.Increment("peer1")
|
||||
s2.Increment("peer1")
|
||||
assert.Equal(t, roundScore(batchWeight*5+2*penalty), s.Score("peer1"), "Unexpected overall score")
|
||||
assert.Equal(t, roundScore(2*penalty), s.Score("peer1"), "Unexpected overall score")
|
||||
// If peer continues to misbehave, score becomes negative.
|
||||
s2.Increment("peer1")
|
||||
assert.Equal(t, roundScore(batchWeight*5+3*penalty), s.Score("peer1"), "Unexpected overall score")
|
||||
assert.Equal(t, roundScore(3*penalty), s.Score("peer1"), "Unexpected overall score")
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
|
||||
"github.com/prysmaticlabs/prysm/config/features"
|
||||
"github.com/prysmaticlabs/prysm/config/params"
|
||||
"github.com/prysmaticlabs/prysm/crypto/rand"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||
@@ -326,7 +327,14 @@ func (p *Status) ChainStateLastUpdated(pid peer.ID) (time.Time, error) {
|
||||
// IsBad states if the peer is to be considered bad (by *any* of the registered scorers).
|
||||
// If the peer is unknown this will return `false`, which makes using this function easier than returning an error.
|
||||
func (p *Status) IsBad(pid peer.ID) bool {
|
||||
return p.isfromBadIP(pid) || p.scorers.IsBadPeer(pid)
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
return p.isBad(pid)
|
||||
}
|
||||
|
||||
// isBad is the lock-free version of IsBad.
|
||||
func (p *Status) isBad(pid peer.ID) bool {
|
||||
return p.isfromBadIP(pid) || p.scorers.IsBadPeerNoLock(pid)
|
||||
}
|
||||
|
||||
// NextValidTime gets the earliest possible time it is to contact/dial
|
||||
@@ -535,6 +543,58 @@ func (p *Status) Prune() {
|
||||
p.store.Lock()
|
||||
defer p.store.Unlock()
|
||||
|
||||
// Default to old method if flag isnt enabled.
|
||||
if !features.Get().EnablePeerScorer {
|
||||
p.deprecatedPrune()
|
||||
return
|
||||
}
|
||||
// Exit early if there is nothing to prune.
|
||||
if len(p.store.Peers()) <= p.store.Config().MaxPeers {
|
||||
return
|
||||
}
|
||||
notBadPeer := func(pid peer.ID) bool {
|
||||
return !p.isBad(pid)
|
||||
}
|
||||
type peerResp struct {
|
||||
pid peer.ID
|
||||
score float64
|
||||
}
|
||||
peersToPrune := make([]*peerResp, 0)
|
||||
// Select disconnected peers with a smaller bad response count.
|
||||
for pid, peerData := range p.store.Peers() {
|
||||
if peerData.ConnState == PeerDisconnected && notBadPeer(pid) {
|
||||
peersToPrune = append(peersToPrune, &peerResp{
|
||||
pid: pid,
|
||||
score: p.Scorers().ScoreNoLock(pid),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Sort peers in descending order, so the peers with the
|
||||
// highest score are pruned first. This
|
||||
// is to protect the node from malicious/lousy peers so
|
||||
// that their memory is still kept.
|
||||
sort.Slice(peersToPrune, func(i, j int) bool {
|
||||
return peersToPrune[i].score > peersToPrune[j].score
|
||||
})
|
||||
|
||||
limitDiff := len(p.store.Peers()) - p.store.Config().MaxPeers
|
||||
if limitDiff > len(peersToPrune) {
|
||||
limitDiff = len(peersToPrune)
|
||||
}
|
||||
|
||||
peersToPrune = peersToPrune[:limitDiff]
|
||||
|
||||
// Delete peers from map.
|
||||
for _, peerData := range peersToPrune {
|
||||
p.store.DeletePeerData(peerData.pid)
|
||||
}
|
||||
p.tallyIPTracker()
|
||||
}
|
||||
|
||||
// Deprecated: This is the old peer pruning method based on
|
||||
// bad response counts.
|
||||
func (p *Status) deprecatedPrune() {
|
||||
// Exit early if there is nothing to prune.
|
||||
if len(p.store.Peers()) <= p.store.Config().MaxPeers {
|
||||
return
|
||||
@@ -570,9 +630,7 @@ func (p *Status) Prune() {
|
||||
if limitDiff > len(peersToPrune) {
|
||||
limitDiff = len(peersToPrune)
|
||||
}
|
||||
|
||||
peersToPrune = peersToPrune[:limitDiff]
|
||||
|
||||
// Delete peers from map.
|
||||
for _, peerData := range peersToPrune {
|
||||
p.store.DeletePeerData(peerData.pid)
|
||||
@@ -687,6 +745,70 @@ func (p *Status) BestNonFinalized(minPeers int, ourHeadEpoch types.Epoch) (types
|
||||
// bad response count. In the future scoring will be used
|
||||
// to determine the most suitable peers to take out.
|
||||
func (p *Status) PeersToPrune() []peer.ID {
|
||||
if !features.Get().EnablePeerScorer {
|
||||
return p.deprecatedPeersToPrune()
|
||||
}
|
||||
connLimit := p.ConnectedPeerLimit()
|
||||
inBoundLimit := p.InboundLimit()
|
||||
activePeers := p.Active()
|
||||
numInboundPeers := len(p.InboundConnected())
|
||||
// Exit early if we are still below our max
|
||||
// limit.
|
||||
if len(activePeers) <= int(connLimit) {
|
||||
return []peer.ID{}
|
||||
}
|
||||
p.store.Lock()
|
||||
defer p.store.Unlock()
|
||||
|
||||
type peerResp struct {
|
||||
pid peer.ID
|
||||
score float64
|
||||
}
|
||||
peersToPrune := make([]*peerResp, 0)
|
||||
// Select connected and inbound peers to prune.
|
||||
for pid, peerData := range p.store.Peers() {
|
||||
if peerData.ConnState == PeerConnected &&
|
||||
peerData.Direction == network.DirInbound {
|
||||
peersToPrune = append(peersToPrune, &peerResp{
|
||||
pid: pid,
|
||||
score: p.scorers.ScoreNoLock(pid),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Sort in ascending order to favour pruning peers with a
|
||||
// lower score.
|
||||
sort.Slice(peersToPrune, func(i, j int) bool {
|
||||
return peersToPrune[i].score < peersToPrune[j].score
|
||||
})
|
||||
|
||||
// Determine amount of peers to prune using our
|
||||
// max connection limit.
|
||||
amountToPrune := len(activePeers) - int(connLimit)
|
||||
|
||||
// Also check for inbound peers above our limit.
|
||||
excessInbound := 0
|
||||
if numInboundPeers > inBoundLimit {
|
||||
excessInbound = numInboundPeers - inBoundLimit
|
||||
}
|
||||
// Prune the largest amount between excess peers and
|
||||
// excess inbound peers.
|
||||
if excessInbound > amountToPrune {
|
||||
amountToPrune = excessInbound
|
||||
}
|
||||
if amountToPrune < len(peersToPrune) {
|
||||
peersToPrune = peersToPrune[:amountToPrune]
|
||||
}
|
||||
ids := make([]peer.ID, 0, len(peersToPrune))
|
||||
for _, pr := range peersToPrune {
|
||||
ids = append(ids, pr.pid)
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
// Deprecated: Is used to represent the older method
|
||||
// of pruning which utilized bad response counts.
|
||||
func (p *Status) deprecatedPeersToPrune() []peer.ID {
|
||||
connLimit := p.ConnectedPeerLimit()
|
||||
inBoundLimit := p.InboundLimit()
|
||||
activePeers := p.Active()
|
||||
@@ -724,7 +846,6 @@ func (p *Status) PeersToPrune() []peer.ID {
|
||||
// Determine amount of peers to prune using our
|
||||
// max connection limit.
|
||||
amountToPrune := len(activePeers) - int(connLimit)
|
||||
|
||||
// Also check for inbound peers above our limit.
|
||||
excessInbound := 0
|
||||
if numInboundPeers > inBoundLimit {
|
||||
@@ -768,10 +889,9 @@ func (p *Status) ConnectedPeerLimit() uint64 {
|
||||
return uint64(maxLim) - maxLimitBuffer
|
||||
}
|
||||
|
||||
// this method assumes the store lock is acquired before
|
||||
// executing the method.
|
||||
func (p *Status) isfromBadIP(pid peer.ID) bool {
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
|
||||
peerData, ok := p.store.PeerData(pid)
|
||||
if !ok {
|
||||
return false
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
|
||||
"github.com/prysmaticlabs/prysm/config/features"
|
||||
"github.com/prysmaticlabs/prysm/config/params"
|
||||
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||
@@ -548,6 +549,10 @@ func TestPrune(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPeerIPTracker(t *testing.T) {
|
||||
resetCfg := features.InitWithReset(&features.Flags{
|
||||
EnablePeerScorer: false,
|
||||
})
|
||||
defer resetCfg()
|
||||
maxBadResponses := 2
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
@@ -686,6 +691,10 @@ func TestAtInboundPeerLimit(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPrunePeers(t *testing.T) {
|
||||
resetCfg := features.InitWithReset(&features.Flags{
|
||||
EnablePeerScorer: false,
|
||||
})
|
||||
defer resetCfg()
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
ScorerParams: &scorers.Config{
|
||||
|
||||
@@ -58,7 +58,11 @@ func (s *Service) disconnectBadPeer(ctx context.Context, id peer.ID) {
|
||||
if !s.cfg.P2P.Peers().IsBad(id) {
|
||||
return
|
||||
}
|
||||
goodbyeCode := p2ptypes.ErrToGoodbyeCode(s.cfg.P2P.Peers().Scorers().ValidationError(id))
|
||||
err := s.cfg.P2P.Peers().Scorers().ValidationError(id)
|
||||
goodbyeCode := p2ptypes.ErrToGoodbyeCode(err)
|
||||
if err == nil {
|
||||
goodbyeCode = p2ptypes.GoodbyeCodeBanned
|
||||
}
|
||||
if err := s.sendGoodByeAndDisconnect(ctx, goodbyeCode, id); err != nil {
|
||||
log.Debugf("Error when disconnecting with bad peer: %v", err)
|
||||
}
|
||||
|
||||
@@ -56,6 +56,10 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
|
||||
if err := helpers.ValidateNilAttestation(m.Message.Aggregate); err != nil {
|
||||
return pubsub.ValidationReject, err
|
||||
}
|
||||
// Do not process slot 0 aggregates.
|
||||
if m.Message.Aggregate.Data.Slot == 0 {
|
||||
return pubsub.ValidationIgnore, nil
|
||||
}
|
||||
|
||||
// Broadcast the aggregated attestation on a feed to notify other services in the beacon node
|
||||
// of a received aggregated attestation.
|
||||
|
||||
@@ -316,6 +316,7 @@ func TestValidateAggregateAndProof_CanValidate(t *testing.T) {
|
||||
aggBits.SetBitAt(0, true)
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{
|
||||
Slot: 1,
|
||||
BeaconBlockRoot: root[:],
|
||||
Source: ðpb.Checkpoint{Epoch: 0, Root: bytesutil.PadTo([]byte("hello-world"), 32)},
|
||||
Target: ðpb.Checkpoint{Epoch: 0, Root: root[:]},
|
||||
@@ -357,7 +358,7 @@ func TestValidateAggregateAndProof_CanValidate(t *testing.T) {
|
||||
P2P: p,
|
||||
DB: db,
|
||||
InitialSync: &mockSync.Sync{IsSyncing: false},
|
||||
Chain: &mock.ChainService{Genesis: time.Now(),
|
||||
Chain: &mock.ChainService{Genesis: time.Now().Add(-oneEpoch()),
|
||||
State: beaconState,
|
||||
ValidAttestation: true,
|
||||
FinalizedCheckPoint: ðpb.Checkpoint{
|
||||
@@ -410,6 +411,7 @@ func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) {
|
||||
aggBits.SetBitAt(0, true)
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{
|
||||
Slot: 1,
|
||||
BeaconBlockRoot: root[:],
|
||||
Source: ðpb.Checkpoint{Epoch: 0, Root: bytesutil.PadTo([]byte("hello-world"), 32)},
|
||||
Target: ðpb.Checkpoint{Epoch: 0, Root: root[:]},
|
||||
@@ -450,7 +452,7 @@ func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) {
|
||||
P2P: p,
|
||||
DB: db,
|
||||
InitialSync: &mockSync.Sync{IsSyncing: false},
|
||||
Chain: &mock.ChainService{Genesis: time.Now(),
|
||||
Chain: &mock.ChainService{Genesis: time.Now().Add(-oneEpoch()),
|
||||
ValidatorsRoot: [32]byte{'A'},
|
||||
State: beaconState,
|
||||
ValidAttestation: true,
|
||||
|
||||
@@ -61,7 +61,10 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
|
||||
if err := helpers.ValidateNilAttestation(att); err != nil {
|
||||
return pubsub.ValidationReject, err
|
||||
}
|
||||
|
||||
// Do not process slot 0 attestations.
|
||||
if att.Data.Slot == 0 {
|
||||
return pubsub.ValidationIgnore, nil
|
||||
}
|
||||
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
|
||||
// of a received unaggregated attestation.
|
||||
s.cfg.AttestationNotifier.OperationFeed().Send(&feed.Event{
|
||||
|
||||
@@ -144,6 +144,7 @@ var (
|
||||
// devModeFlags holds list of flags that are set when development mode is on.
|
||||
var devModeFlags = []cli.Flag{
|
||||
enableLargerGossipHistory,
|
||||
enablePeerScorer,
|
||||
forceOptMaxCoverAggregationStategy,
|
||||
enableBatchGossipVerification,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user