Deprecates --disable-init-sync-wrr flag (#6400)

* deprecates wrr-related flag

* gofmt + gazelle

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Victor Farazdagi
2020-06-25 18:26:30 +03:00
committed by GitHub
parent 90bfc9a395
commit 00f24f5729
6 changed files with 33 additions and 243 deletions

View File

@@ -28,7 +28,6 @@ go_library(
"//proto/beacon/p2p/v1:go_default_library",
"//shared:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/mathutil:go_default_library",
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",
@@ -114,7 +113,6 @@ go_test(
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",
"//shared/sliceutil:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p_core//:go_default_library",
@@ -122,7 +120,6 @@ go_test(
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)

View File

@@ -1,7 +1,6 @@
package initialsync
import (
"bytes"
"context"
"crypto/rand"
"fmt"
@@ -23,7 +22,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
@@ -235,7 +233,7 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start, count uint64)
}
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
root, finalizedEpoch, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
_, finalizedEpoch, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
if len(peers) == 0 {
response.err = errNoPeersAvailable
@@ -250,21 +248,17 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start, count uint64)
return response
}
if featureconfig.Get().EnableInitSyncWeightedRoundRobin {
response.blocks, response.err = f.fetchBlocksFromSinglePeer(ctx, start, count, peers)
} else {
response.blocks, response.err = f.fetchBlocksFromPeers(ctx, root, finalizedEpoch, start, 1, count, peers)
}
response.blocks, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers)
return response
}
// fetchBlocksFromSinglePeer fetches blocks from a single randomly selected peer.
func (f *blocksFetcher) fetchBlocksFromSinglePeer(
// fetchBlocksFromPeer fetches blocks from a single randomly selected peer.
func (f *blocksFetcher) fetchBlocksFromPeer(
ctx context.Context,
start, count uint64,
peers []peer.ID,
) ([]*eth.SignedBeaconBlock, error) {
ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlocksFromSinglePeer")
ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlocksFromPeer")
defer span.End()
blocks := []*eth.SignedBeaconBlock{}
@@ -289,160 +283,15 @@ func (f *blocksFetcher) fetchBlocksFromSinglePeer(
return blocks, nil
}
// fetchBlocksFromPeers orchestrates block fetching from the available peers.
// In each request a range of blocks is to be requested from multiple peers.
// Example:
// - number of peers = 4
// - range of block slots is 64...128
// Four requests will be spread across the peers using step argument to distribute the load
// i.e. the first peer is asked for block 64, 68, 72... while the second peer is asked for
// 65, 69, 73... and so on for other peers.
func (f *blocksFetcher) fetchBlocksFromPeers(
ctx context.Context,
root []byte,
finalizedEpoch, start, step, count uint64,
peers []peer.ID,
) ([]*eth.SignedBeaconBlock, error) {
ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlocksFromPeers")
defer span.End()
if ctx.Err() != nil {
return []*eth.SignedBeaconBlock{}, ctx.Err()
}
peers, err := f.filterPeers(peers, peersPercentagePerRequest)
if err != nil {
return []*eth.SignedBeaconBlock{}, err
}
if len(peers) == 0 {
return []*eth.SignedBeaconBlock{}, errNoPeersAvailable
}
p2pRequests := new(sync.WaitGroup)
errChan := make(chan error)
blocksChan := make(chan []*eth.SignedBeaconBlock)
p2pRequests.Add(len(peers))
go func() {
p2pRequests.Wait()
close(blocksChan)
}()
// Short circuit start far exceeding the highest finalized epoch in some infinite loop.
highestFinalizedSlot := helpers.StartSlot(finalizedEpoch + 1)
if start > highestFinalizedSlot {
return []*eth.SignedBeaconBlock{}, errSlotIsTooHigh
}
// Spread load evenly among available peers.
perPeerCount := mathutil.Min(count/uint64(len(peers)), f.blocksPerSecond)
remainder := int(count % uint64(len(peers)))
for i, pid := range peers {
start, step := start+uint64(i)*step, step*uint64(len(peers))
// If the count was divided by an odd number of peers, there will be some blocks
// missing from the first requests so we accommodate that scenario.
peerCount := perPeerCount
if i < remainder {
peerCount++
}
// Asking for no blocks may cause the client to hang.
if peerCount == 0 {
p2pRequests.Done()
continue
}
go func(ctx context.Context, pid peer.ID) {
defer p2pRequests.Done()
blocks, err := f.requestBeaconBlocksByRange(ctx, pid, root, start, step, peerCount)
if err != nil {
select {
case <-ctx.Done():
case errChan <- err:
return
}
}
select {
case <-ctx.Done():
case blocksChan <- blocks:
}
}(ctx, pid)
}
var unionRespBlocks []*eth.SignedBeaconBlock
for {
select {
case <-ctx.Done():
return []*eth.SignedBeaconBlock{}, ctx.Err()
case err := <-errChan:
return []*eth.SignedBeaconBlock{}, err
case resp, ok := <-blocksChan:
if ok {
unionRespBlocks = append(unionRespBlocks, resp...)
} else {
sort.Slice(unionRespBlocks, func(i, j int) bool {
return unionRespBlocks[i].Block.Slot < unionRespBlocks[j].Block.Slot
})
return unionRespBlocks, nil
}
}
}
}
// requestBeaconBlocksByRange prepares BeaconBlocksByRange request, and handles possible stale peers
// (by resending the request).
func (f *blocksFetcher) requestBeaconBlocksByRange(
ctx context.Context,
pid peer.ID,
root []byte,
start, step, count uint64,
) ([]*eth.SignedBeaconBlock, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
req := &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: start,
Count: count,
Step: step,
}
if featureconfig.Get().EnableInitSyncWeightedRoundRobin {
return f.requestBlocks(ctx, req, pid)
}
resp, respErr := f.requestBlocks(ctx, req, pid)
if respErr != nil {
// Fail over to some other, randomly selected, peer.
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
root1, _, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
if bytes.Compare(root, root1) != 0 {
return nil, errors.Errorf("can not resend, root mismatch: %x:%x", root, root1)
}
newPID, err := f.selectFailOverPeer(pid, peers)
if err != nil {
return nil, err
}
log.WithError(respErr).WithFields(logrus.Fields{
"numPeers": len(peers),
"failedPeer": pid.Pretty(),
"newPeer": newPID.Pretty(),
}).Debug("Request failed, trying to forward request to another peer")
return f.requestBeaconBlocksByRange(ctx, newPID, root, start, step, count)
}
return resp, nil
}
// requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams.
func (f *blocksFetcher) requestBlocks(
ctx context.Context,
req *p2ppb.BeaconBlocksByRangeRequest,
pid peer.ID,
) ([]*eth.SignedBeaconBlock, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
l := f.getPeerLock(pid)
if l == nil {
return nil, errors.New("cannot obtain lock")
@@ -595,18 +444,16 @@ func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([
limit = mathutil.Min(limit, uint64(len(peers)))
peers = peers[:limit]
if featureconfig.Get().EnableInitSyncWeightedRoundRobin {
// Order peers by remaining capacity, effectively turning in-order
// round robin peer processing into a weighted one (peers with higher
// remaining capacity are preferred). Peers with the same capacity
// are selected at random, since we have already shuffled peers
// at this point.
sort.SliceStable(peers, func(i, j int) bool {
cap1 := f.rateLimiter.Remaining(peers[i].String())
cap2 := f.rateLimiter.Remaining(peers[j].String())
return cap1 > cap2
})
}
// Order peers by remaining capacity, effectively turning in-order
// round robin peer processing into a weighted one (peers with higher
// remaining capacity are preferred). Peers with the same capacity
// are selected at random, since we have already shuffled peers
// at this point.
sort.SliceStable(peers, func(i, j int) bool {
cap1 := f.rateLimiter.Remaining(peers[i].String())
cap2 := f.rateLimiter.Remaining(peers[j].String())
return cap1 > cap2
})
return peers, nil
}

View File

@@ -22,13 +22,10 @@ import (
p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/sirupsen/logrus"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestBlocksFetcher_InitStartStop(t *testing.T) {
@@ -392,23 +389,6 @@ func TestBlocksFetcher_scheduleRequest(t *testing.T) {
})
}
func TestBlocksFetcher_handleRequest(t *testing.T) {
// Handle using default configuration.
t.Run("default config", func(t *testing.T) {
_handleRequest(t)
})
// Now handle using previous implementation, w/o WRR.
t.Run("previous config", func(t *testing.T) {
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{
EnableInitSyncWeightedRoundRobin: false,
})
defer resetCfg()
_handleRequest(t)
})
}
// TODO(6024): Move to TestBlocksFetcher_handleRequest when EnableInitSyncWeightedRoundRobin is released.
func _handleRequest(t *testing.T) {
blockBatchLimit := uint64(flags.Get().BlockBatchLimit)
chainConfig := struct {
expectedBlockSlots []uint64
@@ -512,7 +492,6 @@ func TestBlocksFetcher_requestBeaconBlocksByRange(t *testing.T) {
},
}
hook := logTest.NewGlobal()
mc, p2p, _ := initializeTestServices(t, chainConfig.expectedBlockSlots, chainConfig.peers)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -524,9 +503,13 @@ func TestBlocksFetcher_requestBeaconBlocksByRange(t *testing.T) {
p2p: p2p,
})
root, _, peers := p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, helpers.SlotToEpoch(mc.HeadSlot()))
blocks, err := fetcher.requestBeaconBlocksByRange(context.Background(), peers[0], root, 1, 1, blockBatchLimit)
_, _, peers := p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, helpers.SlotToEpoch(mc.HeadSlot()))
req := &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: 1,
Step: 1,
Count: blockBatchLimit,
}
blocks, err := fetcher.requestBlocks(ctx, req, peers[0])
if err != nil {
t.Errorf("error: %v", err)
}
@@ -534,32 +517,10 @@ func TestBlocksFetcher_requestBeaconBlocksByRange(t *testing.T) {
t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchLimit, len(blocks))
}
if !featureconfig.Get().EnableInitSyncWeightedRoundRobin {
// Test request fail over (success).
err = fetcher.p2p.Disconnect(peers[0])
if err != nil {
t.Error(err)
}
blocks, err = fetcher.requestBeaconBlocksByRange(context.Background(), peers[0], root, 1, 1, blockBatchLimit)
if err != nil {
t.Errorf("error: %v", err)
}
// Test request fail over (error).
err = fetcher.p2p.Disconnect(peers[1])
ctx, cancel = context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
blocks, err = fetcher.requestBeaconBlocksByRange(ctx, peers[1], root, 1, 1, blockBatchLimit)
testutil.AssertLogsContain(t, hook, "Request failed, trying to forward request to another peer")
if err == nil || err.Error() != "context deadline exceeded" {
t.Errorf("expected context closed error, got: %v", err)
}
}
// Test context cancellation.
ctx, cancel = context.WithCancel(context.Background())
cancel()
blocks, err = fetcher.requestBeaconBlocksByRange(ctx, peers[0], root, 1, 1, blockBatchLimit)
blocks, err = fetcher.requestBlocks(ctx, req, peers[0])
if err == nil || err.Error() != "context canceled" {
t.Errorf("expected context closed error, got: %v", err)
}
@@ -731,9 +692,6 @@ func TestBlocksFetcher_nonSkippedSlotAfter(t *testing.T) {
}
func TestBlocksFetcher_filterPeers(t *testing.T) {
if !featureconfig.Get().EnableInitSyncWeightedRoundRobin {
t.Skip("Test is run only when EnableInitSyncWeightedRoundRobin = true")
}
type weightedPeer struct {
peer.ID
usedCapacity int64

View File

@@ -53,8 +53,7 @@ func TestMain(m *testing.M) {
logrus.SetOutput(ioutil.Discard)
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{
EnableInitSyncWeightedRoundRobin: true,
NewStateMgmt: true,
NewStateMgmt: true,
})
defer resetCfg()

View File

@@ -55,7 +55,6 @@ type Flags struct {
NoInitSyncBatchSaveBlocks bool // NoInitSyncBatchSaveBlocks disables batch save blocks mode during initial syncing.
WaitForSynced bool // WaitForSynced uses WaitForSynced in validator startup to ensure it can communicate with the beacon node as soon as possible.
SkipRegenHistoricalStates bool // SkipRegenHistoricalState skips regenerating historical states from genesis to last finalized. This enables a quick switch over to using new-state-mgmt.
EnableInitSyncWeightedRoundRobin bool // EnableInitSyncWeightedRoundRobin enables weighted round robin fetching optimization in initial syncing.
ReduceAttesterStateCopy bool // ReduceAttesterStateCopy reduces head state copies for attester rpc.
// DisableForkChoice disables using LMD-GHOST fork choice to update
// the head of the chain based on attestations and instead accepts any valid received block
@@ -202,11 +201,6 @@ func ConfigureBeaconChain(ctx *cli.Context) {
log.Warn("Enabling skipping of historical states regen")
cfg.SkipRegenHistoricalStates = true
}
cfg.EnableInitSyncWeightedRoundRobin = true
if ctx.Bool(disableInitSyncWeightedRoundRobin.Name) {
log.Warn("Disabling weighted round robin in initial syncing")
cfg.EnableInitSyncWeightedRoundRobin = false
}
if ctx.IsSet(deprecatedP2PWhitelist.Name) {
log.Warnf("--%s is deprecated, please use --%s", deprecatedP2PWhitelist.Name, cmd.P2PAllowList.Name)
if err := ctx.Set(cmd.P2PAllowList.Name, ctx.String(deprecatedP2PWhitelist.Name)); err != nil {

View File

@@ -139,10 +139,6 @@ var (
Name: "enable-stream-duties",
Usage: "Enables validator duties streaming in the validator client",
}
disableInitSyncWeightedRoundRobin = &cli.BoolFlag{
Name: "disable-init-sync-wrr",
Usage: "Disables weighted round robin fetching optimization",
}
disableGRPCConnectionLogging = &cli.BoolFlag{
Name: "disable-grpc-connection-logging",
Usage: "Disables displaying logs for newly connected grpc clients",
@@ -418,11 +414,6 @@ var (
Usage: deprecatedUsage,
Hidden: true,
}
deprecatedEnableInitSyncWeightedRoundRobin = &cli.BoolFlag{
Name: "enable-init-sync-wrr",
Usage: deprecatedUsage,
Hidden: true,
}
deprecatedDisableStateRefCopy = &cli.BoolFlag{
Name: "disable-state-ref-copy",
Usage: deprecatedUsage,
@@ -433,6 +424,11 @@ var (
Usage: deprecatedUsage,
Hidden: true,
}
deprecatedDisableInitSyncWeightedRoundRobin = &cli.BoolFlag{
Name: "disable-init-sync-wrr",
Usage: deprecatedUsage,
Hidden: true,
}
)
var deprecatedFlags = []cli.Flag{
@@ -485,7 +481,7 @@ var deprecatedFlags = []cli.Flag{
deprecatedP2PBlacklist,
deprecatedSchlesiTestnetFlag,
deprecateReduceAttesterStateCopies,
deprecatedEnableInitSyncWeightedRoundRobin,
deprecatedDisableInitSyncWeightedRoundRobin,
deprecatedDisableStateRefCopy,
deprecatedDisableFieldTrie,
}
@@ -538,7 +534,6 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{
disableInitSyncBatchSaveBlocks,
waitForSyncedFlag,
skipRegenHistoricalStates,
disableInitSyncWeightedRoundRobin,
disableNewStateMgmt,
disableReduceAttesterStateCopy,
disableGRPCConnectionLogging,