diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index 66b3e91bd3..5e9c458b28 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -142,6 +142,7 @@ go_test( "//beacon-chain/p2p/peers/scorers:go_default_library", "//beacon-chain/p2p/testing:go_default_library", "//beacon-chain/p2p/types:go_default_library", + "//cmd/beacon-chain/flags:go_default_library", "//crypto/hash:go_default_library", "//network:go_default_library", "//network/forks:go_default_library", diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index 6511ef9e7d..4dfa062a89 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -77,7 +77,7 @@ type PeerManager interface { ENR() *enr.Record DiscoveryAddresses() ([]multiaddr.Multiaddr, error) RefreshENR() - FindPeersWithSubnet(ctx context.Context, topic string, index, threshold uint64) (bool, error) + FindPeersWithSubnet(ctx context.Context, topic string, subIndex uint64, threshold int) (bool, error) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) } diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index faba77cebd..88b60983b8 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -34,7 +34,7 @@ const syncLockerVal = 100 // with those peers. This method will block until the required amount of // peers are found, the method only exits in the event of context timeouts. func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string, - index, threshold uint64) (bool, error) { + index uint64, threshold int) (bool, error) { ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet") defer span.End() @@ -56,7 +56,7 @@ func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string, return false, errors.New("no subnet exists for provided topic") } - currNum := uint64(len(s.pubsub.ListPeers(topic))) + currNum := len(s.pubsub.ListPeers(topic)) wg := new(sync.WaitGroup) for { if err := ctx.Err(); err != nil { @@ -81,7 +81,7 @@ func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string, } // Wait for all dials to be completed. wg.Wait() - currNum = uint64(len(s.pubsub.ListPeers(topic))) + currNum = len(s.pubsub.ListPeers(topic)) } return true, nil } diff --git a/beacon-chain/p2p/subnets_test.go b/beacon-chain/p2p/subnets_test.go index 1c7f8f58fb..5e165ae3d7 100644 --- a/beacon-chain/p2p/subnets_test.go +++ b/beacon-chain/p2p/subnets_test.go @@ -16,9 +16,9 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" + "github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags" pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper" - "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/testutil/assert" "github.com/prysmaticlabs/prysm/shared/testutil/require" ) @@ -26,6 +26,11 @@ import ( func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) { // This test needs to be entirely rewritten and should be done in a follow up PR from #7885. t.Skip("This test is now failing after PR 7885 due to false positive") + gFlags := new(flags.GlobalFlags) + gFlags.MinimumPeersPerSubnet = 4 + flags.Init(gFlags) + // Reset config. + defer flags.Init(new(flags.GlobalFlags)) port := 2000 ipAddr, pkey := createAddrAndPrivKey(t) genesisTime := time.Now() @@ -111,11 +116,11 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) { // look up 3 different subnets ctx := context.Background() - exists, err := s.FindPeersWithSubnet(ctx, "", 1, params.BeaconNetworkConfig().MinimumPeersInSubnet) + exists, err := s.FindPeersWithSubnet(ctx, "", 1, flags.Get().MinimumPeersPerSubnet) require.NoError(t, err) - exists2, err := s.FindPeersWithSubnet(ctx, "", 2, params.BeaconNetworkConfig().MinimumPeersInSubnet) + exists2, err := s.FindPeersWithSubnet(ctx, "", 2, flags.Get().MinimumPeersPerSubnet) require.NoError(t, err) - exists3, err := s.FindPeersWithSubnet(ctx, "", 3, params.BeaconNetworkConfig().MinimumPeersInSubnet) + exists3, err := s.FindPeersWithSubnet(ctx, "", 3, flags.Get().MinimumPeersPerSubnet) require.NoError(t, err) if !exists || !exists2 || !exists3 { t.Fatal("Peer with subnet doesn't exist") @@ -132,7 +137,7 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) { testService.RefreshENR() time.Sleep(2 * time.Second) - exists, err = s.FindPeersWithSubnet(ctx, "", 2, params.BeaconNetworkConfig().MinimumPeersInSubnet) + exists, err = s.FindPeersWithSubnet(ctx, "", 2, flags.Get().MinimumPeersPerSubnet) require.NoError(t, err) assert.Equal(t, true, exists, "Peer with subnet doesn't exist") diff --git a/beacon-chain/p2p/testing/fuzz_p2p.go b/beacon-chain/p2p/testing/fuzz_p2p.go index cf70954f5c..20d9e64fb4 100644 --- a/beacon-chain/p2p/testing/fuzz_p2p.go +++ b/beacon-chain/p2p/testing/fuzz_p2p.go @@ -61,7 +61,7 @@ func (p *FakeP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { } // FindPeersWithSubnet mocks the p2p func. -func (p *FakeP2P) FindPeersWithSubnet(_ context.Context, _ string, _, _ uint64) (bool, error) { +func (p *FakeP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { return false, nil } diff --git a/beacon-chain/p2p/testing/mock_peermanager.go b/beacon-chain/p2p/testing/mock_peermanager.go index b94157c568..7c4b7b7774 100644 --- a/beacon-chain/p2p/testing/mock_peermanager.go +++ b/beacon-chain/p2p/testing/mock_peermanager.go @@ -51,7 +51,7 @@ func (m MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { func (m MockPeerManager) RefreshENR() {} // FindPeersWithSubnet . -func (m MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _, _ uint64) (bool, error) { +func (m MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { return true, nil } diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 994fe76fcb..5819ad655d 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -349,7 +349,7 @@ func (p *TestP2P) Peers() *peers.Status { } // FindPeersWithSubnet mocks the p2p func. -func (p *TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _, _ uint64) (bool, error) { +func (p *TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { return false, nil } diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 8b71c53963..d28b539dfe 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -316,7 +316,7 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator pubsub.Vali s.ctx, s.addDigestAndIndexToTopic(topic, digest, i), i, - params.BeaconNetworkConfig().MinimumPeersInSubnet, + flags.Get().MinimumPeersPerSubnet, ) if err != nil { log.WithError(err).Debug("Could not search for peers") @@ -431,7 +431,7 @@ func (s *Service) subscribeAggregatorSubnet( if !s.validPeersExist(subnetTopic) { log.Debugf("No peers found subscribed to attestation gossip subnet with "+ "committee index %d. Searching network for peers subscribed to the subnet.", idx) - _, err := s.cfg.P2P.FindPeersWithSubnet(s.ctx, subnetTopic, idx, params.BeaconNetworkConfig().MinimumPeersInSubnet) + _, err := s.cfg.P2P.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) if err != nil { log.WithError(err).Debug("Could not search for peers") } @@ -457,7 +457,7 @@ func (s *Service) subscribeSyncSubnet( if !s.validPeersExist(subnetTopic) { log.Debugf("No peers found subscribed to sync gossip subnet with "+ "committee index %d. Searching network for peers subscribed to the subnet.", idx) - _, err := s.cfg.P2P.FindPeersWithSubnet(s.ctx, subnetTopic, idx, params.BeaconNetworkConfig().MinimumPeersInSubnet) + _, err := s.cfg.P2P.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) if err != nil { log.WithError(err).Debug("Could not search for peers") } @@ -516,7 +516,7 @@ func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator pubsub. s.ctx, s.addDigestAndIndexToTopic(topic, digest, i), i, - params.BeaconNetworkConfig().MinimumPeersInSubnet, + flags.Get().MinimumPeersPerSubnet, ) if err != nil { log.WithError(err).Debug("Could not search for peers") @@ -595,7 +595,7 @@ func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) { log.Debugf("No peers found subscribed to attestation gossip subnet with "+ "committee index %d. Searching network for peers subscribed to the subnet.", idx) // perform a search for peers with the desired committee index. - _, err := s.cfg.P2P.FindPeersWithSubnet(s.ctx, subnetTopic, idx, params.BeaconNetworkConfig().MinimumPeersInSubnet) + _, err := s.cfg.P2P.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) if err != nil { log.WithError(err).Debug("Could not search for peers") } @@ -620,7 +620,7 @@ func (s *Service) unSubscribeFromTopic(topic string) { // find if we have peers who are subscribed to the same subnet func (s *Service) validPeersExist(subnetTopic string) bool { numOfPeers := s.cfg.P2P.PubSub().ListPeers(subnetTopic + s.cfg.P2P.Encoding().ProtocolSuffix()) - return uint64(len(numOfPeers)) >= params.BeaconNetworkConfig().MinimumPeersInSubnet + return len(numOfPeers) >= flags.Get().MinimumPeersPerSubnet } func (s *Service) retrievePersistentSubs(currSlot types.Slot) []uint64 { @@ -661,10 +661,10 @@ func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID { for _, sub := range wantedSubs { subnetTopic := fmt.Sprintf(topic, digest, sub) + s.cfg.P2P.Encoding().ProtocolSuffix() peers := s.cfg.P2P.PubSub().ListPeers(subnetTopic) - if len(peers) > int(params.BeaconNetworkConfig().MinimumPeersInSubnet) { + if len(peers) > flags.Get().MinimumPeersPerSubnet { // In the event we have more than the minimum, we can // mark the remaining as viable for pruning. - peers = peers[:params.BeaconNetworkConfig().MinimumPeersInSubnet] + peers = peers[:flags.Get().MinimumPeersPerSubnet] } // Add peer to peer map. for _, p := range peers { diff --git a/beacon-chain/sync/subscriber_test.go b/beacon-chain/sync/subscriber_test.go index 1a3c23688f..7d2a5bd650 100644 --- a/beacon-chain/sync/subscriber_test.go +++ b/beacon-chain/sync/subscriber_test.go @@ -24,6 +24,7 @@ import ( p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing" lruwrpr "github.com/prysmaticlabs/prysm/cache/lru" + "github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/network/forks" pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/shared/bytesutil" @@ -433,6 +434,11 @@ func Test_wrapAndReportValidation(t *testing.T) { } func TestFilterSubnetPeers(t *testing.T) { + gFlags := new(flags.GlobalFlags) + gFlags.MinimumPeersPerSubnet = 4 + flags.Init(gFlags) + // Reset config. + defer flags.Init(new(flags.GlobalFlags)) p := p2ptest.NewTestP2P(t) ctx, cancel := context.WithCancel(context.Background()) currSlot := types.Slot(100) @@ -481,7 +487,7 @@ func TestFilterSubnetPeers(t *testing.T) { // Try with only peers from subnet 20. wantedPeers = []peer.ID{p2.BHost.ID()} // Connect an excess amount of peers in the particular subnet. - for i := uint64(1); i <= params.BeaconNetworkConfig().MinimumPeersInSubnet; i++ { + for i := 1; i <= flags.Get().MinimumPeersPerSubnet; i++ { nPeer := createPeer(t, subnet20) p.Connect(nPeer) wantedPeers = append(wantedPeers, nPeer.BHost.ID()) diff --git a/cmd/beacon-chain/flags/base.go b/cmd/beacon-chain/flags/base.go index 0b59fb4044..78c65d7002 100644 --- a/cmd/beacon-chain/flags/base.go +++ b/cmd/beacon-chain/flags/base.go @@ -143,10 +143,10 @@ var ( Name: "enable-debug-rpc-endpoints", Usage: "Enables the debug rpc service, containing utility endpoints such as /eth/v1alpha1/beacon/state.", } - // SubscribeToAllSubnets defines a flag to specify whether to subscribe to all possible attestation subnets or not. + // SubscribeToAllSubnets defines a flag to specify whether to subscribe to all possible attestation/sync subnets or not. SubscribeToAllSubnets = &cli.BoolFlag{ Name: "subscribe-all-subnets", - Usage: "Subscribe to all possible attestation subnets.", + Usage: "Subscribe to all possible attestation and sync subnets.", } // HistoricalSlasherNode is a set of beacon node flags required for performing historical detection with a slasher. HistoricalSlasherNode = &cli.BoolFlag{ @@ -182,4 +182,10 @@ var ( Usage: "Load a genesis state from ssz file. Testnet genesis files can be found in the " + "eth2-clients/eth2-testnets repository on github.", } + // MinPeersPerSubnet defines a flag to set the minimum number of peers that a node will attempt to peer with for a subnet. + MinPeersPerSubnet = &cli.Uint64Flag{ + Name: "minimum-peers-per-subnet", + Usage: "Sets the minimum number of peers that a node will attempt to peer with that are subscribed to a subnet.", + Value: 4, + } ) diff --git a/cmd/beacon-chain/flags/config.go b/cmd/beacon-chain/flags/config.go index dc1d87ba32..1f82b8ecaf 100644 --- a/cmd/beacon-chain/flags/config.go +++ b/cmd/beacon-chain/flags/config.go @@ -13,6 +13,7 @@ type GlobalFlags struct { DisableDiscv5 bool SubscribeToAllSubnets bool MinimumSyncPeers int + MinimumPeersPerSubnet int BlockBatchLimit int BlockBatchLimitBurstFactor int } @@ -51,6 +52,7 @@ func ConfigureGlobalFlags(ctx *cli.Context) { cfg.DisableDiscv5 = ctx.Bool(DisableDiscv5.Name) cfg.BlockBatchLimit = ctx.Int(BlockBatchLimit.Name) cfg.BlockBatchLimitBurstFactor = ctx.Int(BlockBatchLimitBurstFactor.Name) + cfg.MinimumPeersPerSubnet = ctx.Int(MinPeersPerSubnet.Name) configureMinimumPeers(ctx, cfg) Init(cfg) diff --git a/cmd/beacon-chain/main.go b/cmd/beacon-chain/main.go index b3e8170e0b..37e93a2f0d 100644 --- a/cmd/beacon-chain/main.go +++ b/cmd/beacon-chain/main.go @@ -62,6 +62,7 @@ var appFlags = []cli.Flag{ flags.WeakSubjectivityCheckpt, flags.Eth1HeaderReqLimit, flags.GenesisStatePath, + flags.MinPeersPerSubnet, cmd.EnableBackupWebhookFlag, cmd.BackupWebhookOutputDir, cmd.MinimalConfigFlag, diff --git a/cmd/beacon-chain/usage.go b/cmd/beacon-chain/usage.go index 6333a69fba..a0a51678f7 100644 --- a/cmd/beacon-chain/usage.go +++ b/cmd/beacon-chain/usage.go @@ -121,6 +121,7 @@ var appHelpFlagGroups = []flagGroup{ flags.WeakSubjectivityCheckpt, flags.Eth1HeaderReqLimit, flags.GenesisStatePath, + flags.MinPeersPerSubnet, }, }, { diff --git a/shared/params/mainnet_config.go b/shared/params/mainnet_config.go index 6119d5b005..403418d10a 100644 --- a/shared/params/mainnet_config.go +++ b/shared/params/mainnet_config.go @@ -40,7 +40,6 @@ var mainnetNetworkConfig = &NetworkConfig{ ETH2Key: "eth2", AttSubnetKey: "attnets", SyncCommsSubnetKey: "syncnets", - MinimumPeersInSubnet: 4, MinimumPeersInSubnetSearch: 20, ContractDeploymentBlock: 11184524, // Note: contract was deployed in block 11052984 but no transactions were sent until 11184524. BootstrapNodes: []string{ diff --git a/shared/params/network_config.go b/shared/params/network_config.go index 59245e923e..49e89adc4e 100644 --- a/shared/params/network_config.go +++ b/shared/params/network_config.go @@ -24,7 +24,6 @@ type NetworkConfig struct { ETH2Key string // ETH2Key is the ENR key of the Ethereum consensus object in an enr. AttSubnetKey string // AttSubnetKey is the ENR key of the subnet bitfield in the enr. SyncCommsSubnetKey string // SyncCommsSubnetKey is the ENR key of the sync committee subnet bitfield in the enr. - MinimumPeersInSubnet uint64 // MinimumPeersInSubnet is the required amount of peers that a node is to have its in subnet. MinimumPeersInSubnetSearch uint64 // PeersInSubnetSearch is the required amount of peers that we need to be able to lookup in a subnet search. // Chain Network Config diff --git a/shared/params/testnet_e2e_config.go b/shared/params/testnet_e2e_config.go index f99ea0d0c6..974de13b9a 100644 --- a/shared/params/testnet_e2e_config.go +++ b/shared/params/testnet_e2e_config.go @@ -7,10 +7,6 @@ func UseE2EConfig() { beaconConfig = E2ETestConfig() cfg := BeaconNetworkConfig().Copy() - // Due to the small number of peers in the e2e test network - // setting this to 0, prevents the node from being overwhelmed - // from discovery lookups. - cfg.MinimumPeersInSubnet = 0 OverrideBeaconNetworkConfig(cfg) } diff --git a/testing/endtoend/components/beacon_node.go b/testing/endtoend/components/beacon_node.go index 8e0e520253..6fee816342 100644 --- a/testing/endtoend/components/beacon_node.go +++ b/testing/endtoend/components/beacon_node.go @@ -116,6 +116,7 @@ func (node *BeaconNode) Start(ctx context.Context) error { fmt.Sprintf("--%s=%d", flags.GRPCGatewayPort.Name, e2e.TestParams.BeaconNodeRPCPort+index+40), fmt.Sprintf("--%s=%d", flags.EthApiPort.Name, e2e.TestParams.BeaconNodeRPCPort+index+30), fmt.Sprintf("--%s=%d", flags.ContractDeploymentBlock.Name, 0), + fmt.Sprintf("--%s=%d", flags.MinPeersPerSubnet.Name, 0), fmt.Sprintf("--%s=%d", cmdshared.RPCMaxPageSizeFlag.Name, params.BeaconConfig().MinGenesisActiveValidatorCount), fmt.Sprintf("--%s=%s", cmdshared.BootstrapNode.Name, enr), fmt.Sprintf("--%s=%s", cmdshared.VerbosityFlag.Name, "debug"),