Add Flag To Parameterize Number of Peers in A Subnet (#9631)

* add flag

* use old value

* gaz

* raul's review
This commit is contained in:
Nishant Das
2021-09-21 15:55:52 +08:00
committed by GitHub
parent 28364d9f33
commit b943f7bce5
17 changed files with 46 additions and 29 deletions

View File

@@ -142,6 +142,7 @@ go_test(
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/p2p/types:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//crypto/hash:go_default_library",
"//network:go_default_library",
"//network/forks:go_default_library",

View File

@@ -77,7 +77,7 @@ type PeerManager interface {
ENR() *enr.Record
DiscoveryAddresses() ([]multiaddr.Multiaddr, error)
RefreshENR()
FindPeersWithSubnet(ctx context.Context, topic string, index, threshold uint64) (bool, error)
FindPeersWithSubnet(ctx context.Context, topic string, subIndex uint64, threshold int) (bool, error)
AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error)
}

View File

@@ -34,7 +34,7 @@ const syncLockerVal = 100
// with those peers. This method will block until the required amount of
// peers are found, the method only exits in the event of context timeouts.
func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
index, threshold uint64) (bool, error) {
index uint64, threshold int) (bool, error) {
ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet")
defer span.End()
@@ -56,7 +56,7 @@ func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
return false, errors.New("no subnet exists for provided topic")
}
currNum := uint64(len(s.pubsub.ListPeers(topic)))
currNum := len(s.pubsub.ListPeers(topic))
wg := new(sync.WaitGroup)
for {
if err := ctx.Err(); err != nil {
@@ -81,7 +81,7 @@ func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
}
// Wait for all dials to be completed.
wg.Wait()
currNum = uint64(len(s.pubsub.ListPeers(topic)))
currNum = len(s.pubsub.ListPeers(topic))
}
return true, nil
}

View File

@@ -16,9 +16,9 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
@@ -26,6 +26,11 @@ import (
func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
// This test needs to be entirely rewritten and should be done in a follow up PR from #7885.
t.Skip("This test is now failing after PR 7885 due to false positive")
gFlags := new(flags.GlobalFlags)
gFlags.MinimumPeersPerSubnet = 4
flags.Init(gFlags)
// Reset config.
defer flags.Init(new(flags.GlobalFlags))
port := 2000
ipAddr, pkey := createAddrAndPrivKey(t)
genesisTime := time.Now()
@@ -111,11 +116,11 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
// look up 3 different subnets
ctx := context.Background()
exists, err := s.FindPeersWithSubnet(ctx, "", 1, params.BeaconNetworkConfig().MinimumPeersInSubnet)
exists, err := s.FindPeersWithSubnet(ctx, "", 1, flags.Get().MinimumPeersPerSubnet)
require.NoError(t, err)
exists2, err := s.FindPeersWithSubnet(ctx, "", 2, params.BeaconNetworkConfig().MinimumPeersInSubnet)
exists2, err := s.FindPeersWithSubnet(ctx, "", 2, flags.Get().MinimumPeersPerSubnet)
require.NoError(t, err)
exists3, err := s.FindPeersWithSubnet(ctx, "", 3, params.BeaconNetworkConfig().MinimumPeersInSubnet)
exists3, err := s.FindPeersWithSubnet(ctx, "", 3, flags.Get().MinimumPeersPerSubnet)
require.NoError(t, err)
if !exists || !exists2 || !exists3 {
t.Fatal("Peer with subnet doesn't exist")
@@ -132,7 +137,7 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
testService.RefreshENR()
time.Sleep(2 * time.Second)
exists, err = s.FindPeersWithSubnet(ctx, "", 2, params.BeaconNetworkConfig().MinimumPeersInSubnet)
exists, err = s.FindPeersWithSubnet(ctx, "", 2, flags.Get().MinimumPeersPerSubnet)
require.NoError(t, err)
assert.Equal(t, true, exists, "Peer with subnet doesn't exist")

View File

@@ -61,7 +61,7 @@ func (p *FakeP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
}
// FindPeersWithSubnet mocks the p2p func.
func (p *FakeP2P) FindPeersWithSubnet(_ context.Context, _ string, _, _ uint64) (bool, error) {
func (p *FakeP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) {
return false, nil
}

View File

@@ -51,7 +51,7 @@ func (m MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
func (m MockPeerManager) RefreshENR() {}
// FindPeersWithSubnet .
func (m MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _, _ uint64) (bool, error) {
func (m MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) {
return true, nil
}

View File

@@ -349,7 +349,7 @@ func (p *TestP2P) Peers() *peers.Status {
}
// FindPeersWithSubnet mocks the p2p func.
func (p *TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _, _ uint64) (bool, error) {
func (p *TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) {
return false, nil
}

View File

@@ -316,7 +316,7 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator pubsub.Vali
s.ctx,
s.addDigestAndIndexToTopic(topic, digest, i),
i,
params.BeaconNetworkConfig().MinimumPeersInSubnet,
flags.Get().MinimumPeersPerSubnet,
)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
@@ -431,7 +431,7 @@ func (s *Service) subscribeAggregatorSubnet(
if !s.validPeersExist(subnetTopic) {
log.Debugf("No peers found subscribed to attestation gossip subnet with "+
"committee index %d. Searching network for peers subscribed to the subnet.", idx)
_, err := s.cfg.P2P.FindPeersWithSubnet(s.ctx, subnetTopic, idx, params.BeaconNetworkConfig().MinimumPeersInSubnet)
_, err := s.cfg.P2P.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
@@ -457,7 +457,7 @@ func (s *Service) subscribeSyncSubnet(
if !s.validPeersExist(subnetTopic) {
log.Debugf("No peers found subscribed to sync gossip subnet with "+
"committee index %d. Searching network for peers subscribed to the subnet.", idx)
_, err := s.cfg.P2P.FindPeersWithSubnet(s.ctx, subnetTopic, idx, params.BeaconNetworkConfig().MinimumPeersInSubnet)
_, err := s.cfg.P2P.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
@@ -516,7 +516,7 @@ func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator pubsub.
s.ctx,
s.addDigestAndIndexToTopic(topic, digest, i),
i,
params.BeaconNetworkConfig().MinimumPeersInSubnet,
flags.Get().MinimumPeersPerSubnet,
)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
@@ -595,7 +595,7 @@ func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) {
log.Debugf("No peers found subscribed to attestation gossip subnet with "+
"committee index %d. Searching network for peers subscribed to the subnet.", idx)
// perform a search for peers with the desired committee index.
_, err := s.cfg.P2P.FindPeersWithSubnet(s.ctx, subnetTopic, idx, params.BeaconNetworkConfig().MinimumPeersInSubnet)
_, err := s.cfg.P2P.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
@@ -620,7 +620,7 @@ func (s *Service) unSubscribeFromTopic(topic string) {
// find if we have peers who are subscribed to the same subnet
func (s *Service) validPeersExist(subnetTopic string) bool {
numOfPeers := s.cfg.P2P.PubSub().ListPeers(subnetTopic + s.cfg.P2P.Encoding().ProtocolSuffix())
return uint64(len(numOfPeers)) >= params.BeaconNetworkConfig().MinimumPeersInSubnet
return len(numOfPeers) >= flags.Get().MinimumPeersPerSubnet
}
func (s *Service) retrievePersistentSubs(currSlot types.Slot) []uint64 {
@@ -661,10 +661,10 @@ func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID {
for _, sub := range wantedSubs {
subnetTopic := fmt.Sprintf(topic, digest, sub) + s.cfg.P2P.Encoding().ProtocolSuffix()
peers := s.cfg.P2P.PubSub().ListPeers(subnetTopic)
if len(peers) > int(params.BeaconNetworkConfig().MinimumPeersInSubnet) {
if len(peers) > flags.Get().MinimumPeersPerSubnet {
// In the event we have more than the minimum, we can
// mark the remaining as viable for pruning.
peers = peers[:params.BeaconNetworkConfig().MinimumPeersInSubnet]
peers = peers[:flags.Get().MinimumPeersPerSubnet]
}
// Add peer to peer map.
for _, p := range peers {

View File

@@ -24,6 +24,7 @@ import (
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing"
lruwrpr "github.com/prysmaticlabs/prysm/cache/lru"
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/network/forks"
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
@@ -433,6 +434,11 @@ func Test_wrapAndReportValidation(t *testing.T) {
}
func TestFilterSubnetPeers(t *testing.T) {
gFlags := new(flags.GlobalFlags)
gFlags.MinimumPeersPerSubnet = 4
flags.Init(gFlags)
// Reset config.
defer flags.Init(new(flags.GlobalFlags))
p := p2ptest.NewTestP2P(t)
ctx, cancel := context.WithCancel(context.Background())
currSlot := types.Slot(100)
@@ -481,7 +487,7 @@ func TestFilterSubnetPeers(t *testing.T) {
// Try with only peers from subnet 20.
wantedPeers = []peer.ID{p2.BHost.ID()}
// Connect an excess amount of peers in the particular subnet.
for i := uint64(1); i <= params.BeaconNetworkConfig().MinimumPeersInSubnet; i++ {
for i := 1; i <= flags.Get().MinimumPeersPerSubnet; i++ {
nPeer := createPeer(t, subnet20)
p.Connect(nPeer)
wantedPeers = append(wantedPeers, nPeer.BHost.ID())

View File

@@ -143,10 +143,10 @@ var (
Name: "enable-debug-rpc-endpoints",
Usage: "Enables the debug rpc service, containing utility endpoints such as /eth/v1alpha1/beacon/state.",
}
// SubscribeToAllSubnets defines a flag to specify whether to subscribe to all possible attestation subnets or not.
// SubscribeToAllSubnets defines a flag to specify whether to subscribe to all possible attestation/sync subnets or not.
SubscribeToAllSubnets = &cli.BoolFlag{
Name: "subscribe-all-subnets",
Usage: "Subscribe to all possible attestation subnets.",
Usage: "Subscribe to all possible attestation and sync subnets.",
}
// HistoricalSlasherNode is a set of beacon node flags required for performing historical detection with a slasher.
HistoricalSlasherNode = &cli.BoolFlag{
@@ -182,4 +182,10 @@ var (
Usage: "Load a genesis state from ssz file. Testnet genesis files can be found in the " +
"eth2-clients/eth2-testnets repository on github.",
}
// MinPeersPerSubnet defines a flag to set the minimum number of peers that a node will attempt to peer with for a subnet.
MinPeersPerSubnet = &cli.Uint64Flag{
Name: "minimum-peers-per-subnet",
Usage: "Sets the minimum number of peers that a node will attempt to peer with that are subscribed to a subnet.",
Value: 4,
}
)

View File

@@ -13,6 +13,7 @@ type GlobalFlags struct {
DisableDiscv5 bool
SubscribeToAllSubnets bool
MinimumSyncPeers int
MinimumPeersPerSubnet int
BlockBatchLimit int
BlockBatchLimitBurstFactor int
}
@@ -51,6 +52,7 @@ func ConfigureGlobalFlags(ctx *cli.Context) {
cfg.DisableDiscv5 = ctx.Bool(DisableDiscv5.Name)
cfg.BlockBatchLimit = ctx.Int(BlockBatchLimit.Name)
cfg.BlockBatchLimitBurstFactor = ctx.Int(BlockBatchLimitBurstFactor.Name)
cfg.MinimumPeersPerSubnet = ctx.Int(MinPeersPerSubnet.Name)
configureMinimumPeers(ctx, cfg)
Init(cfg)

View File

@@ -62,6 +62,7 @@ var appFlags = []cli.Flag{
flags.WeakSubjectivityCheckpt,
flags.Eth1HeaderReqLimit,
flags.GenesisStatePath,
flags.MinPeersPerSubnet,
cmd.EnableBackupWebhookFlag,
cmd.BackupWebhookOutputDir,
cmd.MinimalConfigFlag,

View File

@@ -121,6 +121,7 @@ var appHelpFlagGroups = []flagGroup{
flags.WeakSubjectivityCheckpt,
flags.Eth1HeaderReqLimit,
flags.GenesisStatePath,
flags.MinPeersPerSubnet,
},
},
{

View File

@@ -40,7 +40,6 @@ var mainnetNetworkConfig = &NetworkConfig{
ETH2Key: "eth2",
AttSubnetKey: "attnets",
SyncCommsSubnetKey: "syncnets",
MinimumPeersInSubnet: 4,
MinimumPeersInSubnetSearch: 20,
ContractDeploymentBlock: 11184524, // Note: contract was deployed in block 11052984 but no transactions were sent until 11184524.
BootstrapNodes: []string{

View File

@@ -24,7 +24,6 @@ type NetworkConfig struct {
ETH2Key string // ETH2Key is the ENR key of the Ethereum consensus object in an enr.
AttSubnetKey string // AttSubnetKey is the ENR key of the subnet bitfield in the enr.
SyncCommsSubnetKey string // SyncCommsSubnetKey is the ENR key of the sync committee subnet bitfield in the enr.
MinimumPeersInSubnet uint64 // MinimumPeersInSubnet is the required amount of peers that a node is to have its in subnet.
MinimumPeersInSubnetSearch uint64 // PeersInSubnetSearch is the required amount of peers that we need to be able to lookup in a subnet search.
// Chain Network Config

View File

@@ -7,10 +7,6 @@ func UseE2EConfig() {
beaconConfig = E2ETestConfig()
cfg := BeaconNetworkConfig().Copy()
// Due to the small number of peers in the e2e test network
// setting this to 0, prevents the node from being overwhelmed
// from discovery lookups.
cfg.MinimumPeersInSubnet = 0
OverrideBeaconNetworkConfig(cfg)
}

View File

@@ -116,6 +116,7 @@ func (node *BeaconNode) Start(ctx context.Context) error {
fmt.Sprintf("--%s=%d", flags.GRPCGatewayPort.Name, e2e.TestParams.BeaconNodeRPCPort+index+40),
fmt.Sprintf("--%s=%d", flags.EthApiPort.Name, e2e.TestParams.BeaconNodeRPCPort+index+30),
fmt.Sprintf("--%s=%d", flags.ContractDeploymentBlock.Name, 0),
fmt.Sprintf("--%s=%d", flags.MinPeersPerSubnet.Name, 0),
fmt.Sprintf("--%s=%d", cmdshared.RPCMaxPageSizeFlag.Name, params.BeaconConfig().MinGenesisActiveValidatorCount),
fmt.Sprintf("--%s=%s", cmdshared.BootstrapNode.Name, enr),
fmt.Sprintf("--%s=%s", cmdshared.VerbosityFlag.Name, "debug"),