Init-sync: enable peer scorer by default (#7974)

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Victor Farazdagi
2020-12-03 03:45:20 +03:00
committed by GitHub
parent 323769bf1a
commit 5417e8cf31
4 changed files with 5 additions and 138 deletions

View File

@@ -16,7 +16,6 @@ import (
p2pTypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/sirupsen/logrus"
@@ -289,9 +288,7 @@ func (f *blocksFetcher) fetchBlocksFromPeer(
}
for i := 0; i < len(peers); i++ {
if blocks, err := f.requestBlocks(ctx, req, peers[i]); err == nil {
if featureconfig.Get().EnablePeerScorer {
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(peers[i])
}
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(peers[i])
return blocks, peers[i], err
}
}

View File

@@ -3,7 +3,6 @@ package initialsync
import (
"context"
"math"
"sort"
"sync"
"time"
@@ -11,7 +10,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/timeutils"
@@ -91,45 +89,10 @@ func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, err
}
}
// filterPeers returns transformed list of peers, weight ordered or randomized, constrained
// if necessary (when only percentage of peers returned).
// When peer scorer is enabled, fallbacks filterScoredPeers.
func (f *blocksFetcher) filterPeers(ctx context.Context, peers []peer.ID, peersPercentage float64) []peer.ID {
if featureconfig.Get().EnablePeerScorer {
return f.filterScoredPeers(ctx, peers, peersPercentagePerRequest)
}
if len(peers) == 0 {
return peers
}
// Shuffle peers to prevent a bad peer from
// stalling sync with invalid blocks.
f.rand.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
// Select sub-sample from peers (honoring min-max invariants).
peers = trimPeers(peers, peersPercentage)
// Order peers by remaining capacity, effectively turning in-order
// round robin peer processing into a weighted one (peers with higher
// remaining capacity are preferred). Peers with the same capacity
// are selected at random, since we have already shuffled peers
// at this point.
sort.SliceStable(peers, func(i, j int) bool {
cap1 := f.rateLimiter.Remaining(peers[i].String())
cap2 := f.rateLimiter.Remaining(peers[j].String())
return cap1 > cap2
})
return peers
}
// filterScoredPeers returns transformed list of peers, weight sorted by scores and capacity remaining.
// filterPeers returns transformed list of peers, weight sorted by scores and capacity remaining.
// List can be further constrained using peersPercentage, where only percentage of peers are returned.
func (f *blocksFetcher) filterScoredPeers(ctx context.Context, peers []peer.ID, peersPercentage float64) []peer.ID {
ctx, span := trace.StartSpan(ctx, "initialsync.filterScoredPeers")
func (f *blocksFetcher) filterPeers(ctx context.Context, peers []peer.ID, peersPercentage float64) []peer.ID {
ctx, span := trace.StartSpan(ctx, "initialsync.filterPeers")
defer span.End()
if len(peers) == 0 {

View File

@@ -12,7 +12,6 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
"github.com/prysmaticlabs/prysm/shared/timeutils"
@@ -108,97 +107,6 @@ func TestBlocksFetcher_selectFailOverPeer(t *testing.T) {
}
func TestBlocksFetcher_filterPeers(t *testing.T) {
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{
EnablePeerScorer: false,
})
defer resetCfg()
type weightedPeer struct {
peer.ID
usedCapacity int64
}
type args struct {
peers []weightedPeer
peersPercentage float64
}
fetcher := newBlocksFetcher(context.Background(), &blocksFetcherConfig{})
tests := []struct {
name string
args args
want []peer.ID
}{
{
name: "no peers available",
args: args{
peers: []weightedPeer{},
peersPercentage: 1.0,
},
want: []peer.ID{},
},
{
name: "single peer",
args: args{
peers: []weightedPeer{
{"a", 10},
},
peersPercentage: 1.0,
},
want: []peer.ID{"a"},
},
{
name: "multiple peers same capacity",
args: args{
peers: []weightedPeer{
{"a", 10},
{"b", 10},
{"c", 10},
},
peersPercentage: 1.0,
},
want: []peer.ID{"a", "b", "c"},
},
{
name: "multiple peers different capacity",
args: args{
peers: []weightedPeer{
{"a", 20},
{"b", 15},
{"c", 10},
{"d", 90},
{"e", 20},
},
peersPercentage: 1.0,
},
want: []peer.ID{"c", "b", "a", "e", "d"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Non-leaking bucket, with initial capacity of 100.
fetcher.rateLimiter = leakybucket.NewCollector(0.000001, 100, false)
pids := make([]peer.ID, 0)
for _, pid := range tt.args.peers {
pids = append(pids, pid.ID)
fetcher.rateLimiter.Add(pid.ID.String(), pid.usedCapacity)
}
pids = fetcher.filterPeers(context.Background(), pids, tt.args.peersPercentage)
// Re-arrange peers with the same remaining capacity, deterministically .
// They are deliberately shuffled - so that on the same capacity any of
// such peers can be selected. That's why they are sorted here.
sort.SliceStable(pids, func(i, j int) bool {
cap1 := fetcher.rateLimiter.Remaining(pids[i].String())
cap2 := fetcher.rateLimiter.Remaining(pids[j].String())
if cap1 == cap2 {
return pids[i].String() < pids[j].String()
}
return i < j
})
assert.DeepEqual(t, tt.want, pids)
})
}
}
func TestBlocksFetcher_filterScoredPeers(t *testing.T) {
type weightedPeer struct {
peer.ID
usedCapacity int64

View File

@@ -14,7 +14,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/sirupsen/logrus"
)
@@ -264,7 +263,7 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
// updatePeerScorerStats adjusts monitored metrics for a peer.
func (s *Service) updatePeerScorerStats(pid peer.ID, startSlot uint64) {
if !featureconfig.Get().EnablePeerScorer || pid == "" {
if pid == "" {
return
}
headSlot := s.chain.HeadSlot()