From 0625a6906c211ee33ea0ccad8bbd9548d0d292a7 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Thu, 25 Feb 2021 02:40:06 +0800 Subject: [PATCH] Add Better Peer Pruning (#8501) * add better pruning * add test * gaz * victor's review Co-authored-by: Raul Jordan Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- beacon-chain/cache/subnet_ids.go | 19 +++++++ beacon-chain/p2p/peers/status.go | 15 +++++- beacon-chain/sync/BUILD.bazel | 1 + beacon-chain/sync/rpc_status.go | 1 + beacon-chain/sync/subscriber.go | 69 ++++++++++++++++++++++--- beacon-chain/sync/subscriber_test.go | 76 ++++++++++++++++++++++++++++ 6 files changed, 172 insertions(+), 9 deletions(-) diff --git a/beacon-chain/cache/subnet_ids.go b/beacon-chain/cache/subnet_ids.go index c1c2c84721..f2f95a6554 100644 --- a/beacon-chain/cache/subnet_ids.go +++ b/beacon-chain/cache/subnet_ids.go @@ -133,3 +133,22 @@ func (s *subnetIDs) AddPersistentCommittee(pubkey []byte, comIndex []uint64, dur s.persistentSubnets.Set(string(pubkey), comIndex, duration) } + +// EmptyAllCaches empties out all the related caches and flushes any stored +// entries on them. This should only ever be used for testing, in normal +// production, handling of the relevant subnets for each role is done +// separately. +func (s *subnetIDs) EmptyAllCaches() { + // Clear the caches. + s.attesterLock.Lock() + s.attester.Purge() + s.attesterLock.Unlock() + + s.aggregatorLock.Lock() + s.aggregator.Purge() + s.aggregatorLock.Unlock() + + s.subnetsLock.Lock() + s.persistentSubnets.Flush() + s.subnetsLock.Unlock() +} diff --git a/beacon-chain/p2p/peers/status.go b/beacon-chain/p2p/peers/status.go index 2684eecab0..48d68f80cb 100644 --- a/beacon-chain/p2p/peers/status.go +++ b/beacon-chain/p2p/peers/status.go @@ -656,7 +656,9 @@ func (p *Status) BestNonFinalized(minPeers int, ourHeadEpoch types.Epoch) (types // to determine the most suitable peers to take out. func (p *Status) PeersToPrune() []peer.ID { connLimit := p.ConnectedPeerLimit() + inBoundLimit := p.InboundLimit() activePeers := p.Active() + numInboundPeers := len(p.InboundConnected()) // Exit early if we are still below our max // limit. if len(activePeers) <= int(connLimit) { @@ -672,7 +674,8 @@ func (p *Status) PeersToPrune() []peer.ID { peersToPrune := make([]*peerResp, 0) // Select connected and inbound peers to prune. for pid, peerData := range p.store.Peers() { - if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound { + if peerData.ConnState == PeerConnected && + peerData.Direction == network.DirInbound { peersToPrune = append(peersToPrune, &peerResp{ pid: pid, badResp: peerData.BadResponses, @@ -690,6 +693,16 @@ func (p *Status) PeersToPrune() []peer.ID { // max connection limit. amountToPrune := len(activePeers) - int(connLimit) + // Also check for inbound peers above our limit. + excessInbound := 0 + if numInboundPeers > inBoundLimit { + excessInbound = numInboundPeers - inBoundLimit + } + // Prune the largest amount between excess peers and + // excess inbound peers. + if excessInbound > amountToPrune { + amountToPrune = excessInbound + } if amountToPrune < len(peersToPrune) { peersToPrune = peersToPrune[:amountToPrune] } diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index ae38f8b406..f3abb0f63e 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -136,6 +136,7 @@ go_test( shard_count = 4, deps = [ "//beacon-chain/blockchain/testing:go_default_library", + "//beacon-chain/cache:go_default_library", "//beacon-chain/core/feed:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index a2d5ce22ea..0a47726419 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -68,6 +68,7 @@ func (s *Service) maintainPeerStatuses() { // pruning excess peers. wg.Wait() peerIds := s.p2p.Peers().PeersToPrune() + peerIds = s.filterNeededPeers(peerIds) for _, id := range peerIds { if err := s.sendGoodByeAndDisconnect(s.ctx, p2ptypes.GoodbyeCodeTooManyPeers, id); err != nil { log.WithField("peer", id).WithError(err).Debug("Could not disconnect with peer") diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 8310606e0f..ca82998052 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -11,6 +11,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" + types "github.com/prysmaticlabs/eth2-types" pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/flags" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" @@ -264,14 +265,7 @@ func (s *Service) subscribeDynamicWithSubnets( if s.chainStarted.IsSet() && s.initialSync.Syncing() { continue } - - // Persistent subscriptions from validators - persistentSubs := s.persistentSubnetIndices() - // Update desired topic indices for aggregator - wantedSubs := s.aggregatorSubnetIndices(currentSlot) - - // Combine subscriptions to get all requested subscriptions - wantedSubs = sliceutil.SetUint64(append(persistentSubs, wantedSubs...)) + wantedSubs := s.retrievePersistentSubs(currentSlot) // Resize as appropriate. s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest) @@ -358,6 +352,65 @@ func (s *Service) validPeersExist(subnetTopic string) bool { return uint64(len(numOfPeers)) >= params.BeaconNetworkConfig().MinimumPeersInSubnet } +func (s *Service) retrievePersistentSubs(currSlot types.Slot) []uint64 { + // Persistent subscriptions from validators + persistentSubs := s.persistentSubnetIndices() + // Update desired topic indices for aggregator + wantedSubs := s.aggregatorSubnetIndices(currSlot) + + // Combine subscriptions to get all requested subscriptions + return sliceutil.SetUint64(append(persistentSubs, wantedSubs...)) +} + +// filters out required peers for the node to function, not +// pruning peers who are in our attestation subnets. +func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID { + // Exit early if nothing to filter. + if len(pids) == 0 { + return pids + } + digest, err := s.forkDigest() + if err != nil { + log.WithError(err).Error("Could not compute fork digest") + return pids + } + currSlot := s.chain.CurrentSlot() + wantedSubs := s.retrievePersistentSubs(currSlot) + wantedSubs = sliceutil.SetUint64(append(wantedSubs, s.attesterSubnetIndices(currSlot)...)) + topic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})] + + // Map of peers in subnets + peerMap := make(map[peer.ID]bool) + + for _, sub := range wantedSubs { + subnetTopic := fmt.Sprintf(topic, digest, sub) + s.p2p.Encoding().ProtocolSuffix() + peers := s.p2p.PubSub().ListPeers(subnetTopic) + if len(peers) > int(params.BeaconNetworkConfig().MinimumPeersInSubnet) { + // In the event we have more than the minimum, we can + // mark the remaining as viable for pruning. + peers = peers[:params.BeaconNetworkConfig().MinimumPeersInSubnet] + } + // Add peer to peer map. + for _, p := range peers { + // Even if the peer id has + // already been seen we still set + // it, as the outcome is the same. + peerMap[p] = true + } + } + + // Clear out necessary peers from the peers to prune. + newPeers := make([]peer.ID, 0, len(pids)) + + for _, pid := range pids { + if peerMap[pid] { + continue + } + newPeers = append(newPeers, pid) + } + return newPeers +} + // Add fork digest to topic. func (s *Service) addDigestToTopic(topic string) string { if !strings.Contains(topic, "%x") { diff --git a/beacon-chain/sync/subscriber_test.go b/beacon-chain/sync/subscriber_test.go index 75b9cf1d9b..e38100c688 100644 --- a/beacon-chain/sync/subscriber_test.go +++ b/beacon-chain/sync/subscriber_test.go @@ -13,8 +13,10 @@ import ( "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" + types "github.com/prysmaticlabs/eth2-types" pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" db "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" @@ -356,3 +358,77 @@ func Test_wrapAndReportValidation(t *testing.T) { }) } } + +func TestFilterSubnetPeers(t *testing.T) { + p := p2ptest.NewTestP2P(t) + ctx, cancel := context.WithCancel(context.Background()) + currSlot := types.Slot(100) + r := Service{ + ctx: ctx, + chain: &mockChain.ChainService{ + Genesis: time.Now(), + ValidatorsRoot: [32]byte{'A'}, + Slot: &currSlot, + }, + p2p: p, + chainStarted: abool.New(), + } + // Empty cache at the end of the test. + defer cache.SubnetIDs.EmptyAllCaches() + + defaultTopic := "/eth2/%x/beacon_attestation_%d" + r.p2p.Encoding().ProtocolSuffix() + subnet10 := r.addDigestAndIndexToTopic(defaultTopic, 10) + cache.SubnetIDs.AddAggregatorSubnetID(currSlot, 10) + + subnet20 := r.addDigestAndIndexToTopic(defaultTopic, 20) + cache.SubnetIDs.AddAttesterSubnetID(currSlot, 20) + + p1 := createPeer(t, subnet10) + p2 := createPeer(t, subnet10, subnet20) + p3 := createPeer(t) + + // Connect to all + // peers. + p.Connect(p1) + p.Connect(p2) + p.Connect(p3) + + // Sleep a while to allow peers to connect. + time.Sleep(100 * time.Millisecond) + + wantedPeers := []peer.ID{p1.PeerID(), p2.PeerID(), p3.PeerID()} + // Expect Peer 3 to be marked as suitable. + recPeers := r.filterNeededPeers(wantedPeers) + assert.DeepEqual(t, []peer.ID{p3.PeerID()}, recPeers) + + // Try with only peers from subnet 20. + wantedPeers = []peer.ID{p2.BHost.ID()} + // Connect an excess amount of peers in the particular subnet. + for i := uint64(1); i <= params.BeaconNetworkConfig().MinimumPeersInSubnet; i++ { + nPeer := createPeer(t, subnet20) + p.Connect(nPeer) + wantedPeers = append(wantedPeers, nPeer.BHost.ID()) + time.Sleep(100 * time.Millisecond) + } + + recPeers = r.filterNeededPeers(wantedPeers) + assert.DeepEqual(t, 1, len(recPeers), "expected at least 1 suitable peer to prune") + + cancel() +} + +// Create peer and register them to provided topics. +func createPeer(t *testing.T, topics ...string) *p2ptest.TestP2P { + p := p2ptest.NewTestP2P(t) + for _, tp := range topics { + jTop, err := p.PubSub().Join(tp) + if err != nil { + t.Fatal(err) + } + _, err = jTop.Subscribe() + if err != nil { + t.Fatal(err) + } + } + return p +}