Implements WRR in init-sync, full bandwidth utilization (#5887)

* implements weighted round robin in init-sync
* protection against evil peer
* reshuffle
* shorten flag
* deterministic order of wrr
* Merge branch 'master' into init-sync-wrr
* Merge refs/heads/master into init-sync-wrr
* Merge refs/heads/master into init-sync-wrr
* allow peers with the same capacity to pe selected at random
* Merge branch 'init-sync-wrr' of github.com:prysmaticlabs/prysm into init-sync-wrr
* adds wrr to e2e tests
* Merge refs/heads/master into init-sync-wrr
* Merge refs/heads/master into init-sync-wrr
* Merge refs/heads/master into init-sync-wrr
This commit is contained in:
Victor Farazdagi
2020-05-18 21:59:03 +03:00
committed by GitHub
parent 52105e6083
commit ca26745720
11 changed files with 337 additions and 212 deletions

View File

@@ -2,19 +2,23 @@ package initialsync
import (
"context"
"reflect"
"sort"
"sync"
"testing"
"time"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p-core/peer"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
"github.com/prysmaticlabs/prysm/shared/testutil"
@@ -86,6 +90,17 @@ func TestBlocksFetcherInitStartStop(t *testing.T) {
}
func TestBlocksFetcherRoundRobin(t *testing.T) {
blockBatchLimit := uint64(flags.Get().BlockBatchLimit)
requestsGenerator := func(start, end uint64, batchSize uint64) []*fetchRequestParams {
var requests []*fetchRequestParams
for i := start; i <= end; i += batchSize {
requests = append(requests, &fetchRequestParams{
start: i,
count: batchSize,
})
}
return requests
}
tests := []struct {
name string
expectedBlockSlots []uint64
@@ -94,110 +109,52 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
}{
{
name: "Single peer with all blocks",
expectedBlockSlots: makeSequence(1, 3*blockBatchSize),
expectedBlockSlots: makeSequence(1, 3*blockBatchLimit),
peers: []*peerData{
{
blocks: makeSequence(1, 131),
finalizedEpoch: 3,
headSlot: 131,
},
},
requests: []*fetchRequestParams{
{
start: 1,
count: blockBatchSize,
},
{
start: blockBatchSize + 1,
count: blockBatchSize,
},
{
start: 2*blockBatchSize + 1,
count: blockBatchSize,
blocks: makeSequence(1, 3*blockBatchLimit),
finalizedEpoch: helpers.SlotToEpoch(3 * blockBatchLimit),
headSlot: 3 * blockBatchLimit,
},
},
requests: requestsGenerator(1, 3*blockBatchLimit, blockBatchLimit),
},
{
name: "Single peer with all blocks (many small requests)",
expectedBlockSlots: makeSequence(1, 80),
expectedBlockSlots: makeSequence(1, 3*blockBatchLimit),
peers: []*peerData{
{
blocks: makeSequence(1, 131),
finalizedEpoch: 3,
headSlot: 131,
},
},
requests: []*fetchRequestParams{
{
start: 1,
count: blockBatchSize / 2,
},
{
start: blockBatchSize/2 + 1,
count: blockBatchSize / 2,
},
{
start: 2*blockBatchSize/2 + 1,
count: blockBatchSize / 2,
},
{
start: 3*blockBatchSize/2 + 1,
count: blockBatchSize / 2,
},
{
start: 4*blockBatchSize/2 + 1,
count: blockBatchSize / 2,
blocks: makeSequence(1, 3*blockBatchLimit),
finalizedEpoch: helpers.SlotToEpoch(3 * blockBatchLimit),
headSlot: 3 * blockBatchLimit,
},
},
requests: requestsGenerator(1, 3*blockBatchLimit, blockBatchLimit/4),
},
{
name: "Multiple peers with all blocks",
expectedBlockSlots: makeSequence(1, 96), // up to 4th epoch
expectedBlockSlots: makeSequence(1, 3*blockBatchLimit),
peers: []*peerData{
{
blocks: makeSequence(1, 131),
finalizedEpoch: 3,
headSlot: 131,
blocks: makeSequence(1, 3*blockBatchLimit),
finalizedEpoch: helpers.SlotToEpoch(3 * blockBatchLimit),
headSlot: 3 * blockBatchLimit,
},
{
blocks: makeSequence(1, 131),
finalizedEpoch: 3,
headSlot: 131,
blocks: makeSequence(1, 3*blockBatchLimit),
finalizedEpoch: helpers.SlotToEpoch(3 * blockBatchLimit),
headSlot: 3 * blockBatchLimit,
},
{
blocks: makeSequence(1, 131),
finalizedEpoch: 3,
headSlot: 131,
},
{
blocks: makeSequence(1, 131),
finalizedEpoch: 3,
headSlot: 131,
},
{
blocks: makeSequence(1, 131),
finalizedEpoch: 3,
headSlot: 131,
},
},
requests: []*fetchRequestParams{
{
start: 1,
count: blockBatchSize,
},
{
start: blockBatchSize + 1,
count: blockBatchSize,
},
{
start: 2*blockBatchSize + 1,
count: blockBatchSize,
blocks: makeSequence(1, 3*blockBatchLimit),
finalizedEpoch: helpers.SlotToEpoch(3 * blockBatchLimit),
headSlot: 3 * blockBatchLimit,
},
},
requests: requestsGenerator(1, 3*blockBatchLimit, blockBatchLimit),
},
{
name: "Multiple peers with skipped slots",
// finalizedEpoch(18).slot = 608
name: "Multiple peers with skipped slots",
expectedBlockSlots: append(makeSequence(1, 64), makeSequence(500, 640)...), // up to 18th epoch
peers: []*peerData{
{
@@ -229,15 +186,15 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
requests: []*fetchRequestParams{
{
start: 1,
count: blockBatchSize,
count: blockBatchLimit,
},
{
start: blockBatchSize + 1,
count: blockBatchSize,
start: blockBatchLimit + 1,
count: blockBatchLimit,
},
{
start: 2*blockBatchSize + 1,
count: blockBatchSize,
start: 2*blockBatchLimit + 1,
count: blockBatchLimit,
},
{
start: 500,
@@ -251,7 +208,7 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
},
{
name: "Multiple peers with failures",
expectedBlockSlots: makeSequence(1, 2*blockBatchSize),
expectedBlockSlots: makeSequence(1, 2*blockBatchLimit),
peers: []*peerData{
{
blocks: makeSequence(1, 320),
@@ -278,11 +235,11 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
requests: []*fetchRequestParams{
{
start: 1,
count: blockBatchSize,
count: blockBatchLimit,
},
{
start: blockBatchSize + 1,
count: blockBatchSize,
start: blockBatchLimit + 1,
count: blockBatchLimit,
},
},
},
@@ -413,6 +370,7 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
}
func TestBlocksFetcherScheduleRequest(t *testing.T) {
blockBatchLimit := uint64(flags.Get().BlockBatchLimit)
t.Run("context cancellation", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
@@ -420,18 +378,19 @@ func TestBlocksFetcherScheduleRequest(t *testing.T) {
p2p: nil,
})
cancel()
if err := fetcher.scheduleRequest(ctx, 1, blockBatchSize); err == nil {
if err := fetcher.scheduleRequest(ctx, 1, blockBatchLimit); err == nil {
t.Errorf("expected error: %v", errFetcherCtxIsDone)
}
})
}
func TestBlocksFetcherHandleRequest(t *testing.T) {
blockBatchLimit := uint64(flags.Get().BlockBatchLimit)
chainConfig := struct {
expectedBlockSlots []uint64
peers []*peerData
}{
expectedBlockSlots: makeSequence(1, blockBatchSize),
expectedBlockSlots: makeSequence(1, blockBatchLimit),
peers: []*peerData{
{
blocks: makeSequence(1, 320),
@@ -456,7 +415,7 @@ func TestBlocksFetcherHandleRequest(t *testing.T) {
})
cancel()
response := fetcher.handleRequest(ctx, 1, blockBatchSize)
response := fetcher.handleRequest(ctx, 1, blockBatchLimit)
if response.err == nil {
t.Errorf("expected error: %v", errFetcherCtxIsDone)
}
@@ -472,7 +431,7 @@ func TestBlocksFetcherHandleRequest(t *testing.T) {
requestCtx, _ := context.WithTimeout(context.Background(), 2*time.Second)
go func() {
response := fetcher.handleRequest(requestCtx, 1 /* start */, blockBatchSize /* count */)
response := fetcher.handleRequest(requestCtx, 1 /* start */, blockBatchLimit /* count */)
select {
case <-ctx.Done():
case fetcher.fetchResponses <- response:
@@ -490,8 +449,8 @@ func TestBlocksFetcherHandleRequest(t *testing.T) {
blocks = resp.blocks
}
}
if len(blocks) != blockBatchSize {
t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchSize, len(blocks))
if uint64(len(blocks)) != blockBatchLimit {
t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchLimit, len(blocks))
}
var receivedBlockSlots []uint64
@@ -505,6 +464,7 @@ func TestBlocksFetcherHandleRequest(t *testing.T) {
}
func TestBlocksFetcherRequestBeaconBlocksByRangeRequest(t *testing.T) {
blockBatchLimit := uint64(flags.Get().BlockBatchLimit)
chainConfig := struct {
expectedBlockSlots []uint64
peers []*peerData
@@ -538,37 +498,39 @@ func TestBlocksFetcherRequestBeaconBlocksByRangeRequest(t *testing.T) {
root, _, peers := p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, helpers.SlotToEpoch(mc.HeadSlot()))
blocks, err := fetcher.requestBeaconBlocksByRange(context.Background(), peers[0], root, 1, 1, blockBatchSize)
blocks, err := fetcher.requestBeaconBlocksByRange(context.Background(), peers[0], root, 1, 1, blockBatchLimit)
if err != nil {
t.Errorf("error: %v", err)
}
if len(blocks) != blockBatchSize {
t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchSize, len(blocks))
if uint64(len(blocks)) != blockBatchLimit {
t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchLimit, len(blocks))
}
// Test request fail over (success).
err = fetcher.p2p.Disconnect(peers[0])
if err != nil {
t.Error(err)
}
blocks, err = fetcher.requestBeaconBlocksByRange(context.Background(), peers[0], root, 1, 1, blockBatchSize)
if err != nil {
t.Errorf("error: %v", err)
}
if !featureconfig.Get().EnableInitSyncWeightedRoundRobin {
// Test request fail over (success).
err = fetcher.p2p.Disconnect(peers[0])
if err != nil {
t.Error(err)
}
blocks, err = fetcher.requestBeaconBlocksByRange(context.Background(), peers[0], root, 1, 1, blockBatchLimit)
if err != nil {
t.Errorf("error: %v", err)
}
// Test request fail over (error).
err = fetcher.p2p.Disconnect(peers[1])
ctx, _ = context.WithTimeout(context.Background(), time.Second*1)
blocks, err = fetcher.requestBeaconBlocksByRange(ctx, peers[1], root, 1, 1, blockBatchSize)
testutil.AssertLogsContain(t, hook, "Request failed, trying to forward request to another peer")
if err == nil || err.Error() != "context deadline exceeded" {
t.Errorf("expected context closed error, got: %v", err)
// Test request fail over (error).
err = fetcher.p2p.Disconnect(peers[1])
ctx, _ = context.WithTimeout(context.Background(), time.Second*1)
blocks, err = fetcher.requestBeaconBlocksByRange(ctx, peers[1], root, 1, 1, blockBatchLimit)
testutil.AssertLogsContain(t, hook, "Request failed, trying to forward request to another peer")
if err == nil || err.Error() != "context deadline exceeded" {
t.Errorf("expected context closed error, got: %v", err)
}
}
// Test context cancellation.
ctx, cancel = context.WithCancel(context.Background())
cancel()
blocks, err = fetcher.requestBeaconBlocksByRange(ctx, peers[0], root, 1, 1, blockBatchSize)
blocks, err = fetcher.requestBeaconBlocksByRange(ctx, peers[0], root, 1, 1, blockBatchLimit)
if err == nil || err.Error() != "context canceled" {
t.Errorf("expected context closed error, got: %v", err)
}
@@ -579,6 +541,7 @@ func TestBlocksFetcherSelectFailOverPeer(t *testing.T) {
excludedPID peer.ID
peers []peer.ID
}
fetcher := newBlocksFetcher(context.Background(), &blocksFetcherConfig{})
tests := []struct {
name string
args args
@@ -641,7 +604,7 @@ func TestBlocksFetcherSelectFailOverPeer(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := selectFailOverPeer(tt.args.excludedPID, tt.args.peers)
got, _, err := fetcher.selectFailOverPeer(tt.args.excludedPID, tt.args.peers)
if err != nil && err != tt.wantErr {
t.Errorf("selectFailOverPeer() error = %v, wantErr %v", err, tt.wantErr)
return
@@ -701,3 +664,91 @@ func TestBlocksFetcherNonSkippedSlotAfter(t *testing.T) {
}
}
}
func TestBlocksFetcherFilterPeers(t *testing.T) {
type weightedPeer struct {
peer.ID
usedCapacity int64
}
type args struct {
peers []weightedPeer
peersPercentage float64
}
fetcher := newBlocksFetcher(context.Background(), &blocksFetcherConfig{})
tests := []struct {
name string
args args
want []peer.ID
}{
{
name: "no peers available",
args: args{
peers: []weightedPeer{},
peersPercentage: 1.0,
},
want: []peer.ID{},
},
{
name: "single peer",
args: args{
peers: []weightedPeer{
{"abc", 10},
},
peersPercentage: 1.0,
},
want: []peer.ID{"abc"},
},
{
name: "multiple peers same capacity",
args: args{
peers: []weightedPeer{
{"abc", 10},
{"def", 10},
{"xyz", 10},
},
peersPercentage: 1.0,
},
want: []peer.ID{"abc", "def", "xyz"},
},
{
name: "multiple peers different capacity",
args: args{
peers: []weightedPeer{
{"abc", 20},
{"def", 15},
{"ghi", 10},
{"jkl", 90},
{"xyz", 20},
},
peersPercentage: 1.0,
},
want: []peer.ID{"ghi", "def", "abc", "xyz", "jkl"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Non-leaking bucket, with initial capacity of 100.
fetcher.rateLimiter = leakybucket.NewCollector(0.000001, 100, false)
pids := make([]peer.ID, 0)
for _, pid := range tt.args.peers {
pids = append(pids, pid.ID)
fetcher.rateLimiter.Add(pid.ID.String(), pid.usedCapacity)
}
got := fetcher.filterPeers(pids, tt.args.peersPercentage)
// Re-arrange deterministically peers with the same remaining capacity.
// They are deliberately shuffled - so that on the same capacity any of
// such peers can be selected. That's why they are sorted here.
sort.SliceStable(got, func(i, j int) bool {
cap1 := fetcher.rateLimiter.Remaining(pids[i].String())
cap2 := fetcher.rateLimiter.Remaining(pids[j].String())
if cap1 == cap2 {
return pids[i].String() < pids[j].String()
}
return i < j
})
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("filterPeers() got = %#v, want %#v", got, tt.want)
}
})
}
}