From 11bbf06d0367e0fbef78aeab4d3e8f939d39a360 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Sat, 12 Dec 2020 11:46:55 +0800 Subject: [PATCH] Add Inbound Peer Limit (#7942) * add changes * fix up * fix * add test * fix test * fix again Co-authored-by: Preston Van Loon --- beacon-chain/p2p/BUILD.bazel | 1 + beacon-chain/p2p/connection_gater.go | 2 +- beacon-chain/p2p/connection_gater_test.go | 37 +++++++++++++++++++ beacon-chain/p2p/discovery_test.go | 2 +- beacon-chain/p2p/peers/BUILD.bazel | 1 + beacon-chain/p2p/peers/status.go | 31 +++++++++++++++- beacon-chain/p2p/peers/status_test.go | 44 ++++++++++++++++++++--- 7 files changed, 111 insertions(+), 7 deletions(-) diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index dd338ad6d7..5e960f3b1a 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -158,6 +158,7 @@ go_test( "@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library", "@com_github_libp2p_go_libp2p_swarm//testing:go_default_library", "@com_github_multiformats_go_multiaddr//:go_default_library", + "@com_github_prysmaticlabs_ethereumapis//eth/v1:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", "@com_github_prysmaticlabs_go_bitfield//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", diff --git a/beacon-chain/p2p/connection_gater.go b/beacon-chain/p2p/connection_gater.go index 001ce36a69..477fb43291 100644 --- a/beacon-chain/p2p/connection_gater.go +++ b/beacon-chain/p2p/connection_gater.go @@ -79,7 +79,7 @@ func (s *Service) validateDial(addr multiaddr.Multiaddr) bool { return false } s.ipLimiter.Add(ip.String(), 1) - return true + return !s.peers.IsAboveInboundLimit() } // configureFilter looks at the provided allow lists and diff --git a/beacon-chain/p2p/connection_gater_test.go b/beacon-chain/p2p/connection_gater_test.go index a7b3341572..11e4632007 100644 --- a/beacon-chain/p2p/connection_gater_test.go +++ b/beacon-chain/p2p/connection_gater_test.go @@ -9,7 +9,9 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/peer" "github.com/multiformats/go-multiaddr" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers" + "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers" "github.com/prysmaticlabs/prysm/shared/testutil/assert" "github.com/prysmaticlabs/prysm/shared/testutil/require" @@ -69,6 +71,7 @@ func TestService_InterceptBannedIP(t *testing.T) { s := &Service{ ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false), peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 20, ScorerParams: &scorers.Config{}, }), } @@ -91,6 +94,40 @@ func TestService_InterceptBannedIP(t *testing.T) { } } +func TestService_RejectInboundPeersBeyondLimit(t *testing.T) { + limit := 20 + s := &Service{ + ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false), + peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: limit, + ScorerParams: &scorers.Config{}, + }), + } + var err error + s.addrFilter, err = configureFilter(&Config{}) + require.NoError(t, err) + ip := "212.67.10.122" + multiAddress, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ip, 3000)) + require.NoError(t, err) + + valid := s.validateDial(multiAddress) + if !valid { + t.Errorf("Expected multiaddress with ip %s to be accepted as it is below the inbound limit", ip) + } + + inboundLimit := float64(limit) * peers.InboundRatio + // top off by 1 to trigger it above the limit. + inboundLimit += 1 + // Add in up to inbound peer limit. + for i := 0; i < int(inboundLimit); i++ { + addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) + } + valid = s.validateDial(multiAddress) + if valid { + t.Errorf("Expected multiaddress with ip %s to be rejected as it exceeds the inbound limit", ip) + } +} + func TestPeer_BelowMaxLimit(t *testing.T) { // create host and remote peer ipAddr, pkey := createAddrAndPrivKey(t) diff --git a/beacon-chain/p2p/discovery_test.go b/beacon-chain/p2p/discovery_test.go index 00a31988d5..b462edfacf 100644 --- a/beacon-chain/p2p/discovery_test.go +++ b/beacon-chain/p2p/discovery_test.go @@ -274,7 +274,7 @@ func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState) mhBytes = append(mhBytes, idBytes...) id, err := peer.IDFromBytes(mhBytes) require.NoError(t, err) - p.Add(new(enr.Record), id, nil, network.DirUnknown) + p.Add(new(enr.Record), id, nil, network.DirInbound) p.SetConnectionState(id, state) p.SetMetadata(id, &pb.MetaData{ SeqNumber: 0, diff --git a/beacon-chain/p2p/peers/BUILD.bazel b/beacon-chain/p2p/peers/BUILD.bazel index 1b05faebf3..76fd9ee341 100644 --- a/beacon-chain/p2p/peers/BUILD.bazel +++ b/beacon-chain/p2p/peers/BUILD.bazel @@ -44,6 +44,7 @@ go_test( "@com_github_libp2p_go_libp2p_core//network:go_default_library", "@com_github_libp2p_go_libp2p_core//peer:go_default_library", "@com_github_multiformats_go_multiaddr//:go_default_library", + "@com_github_prysmaticlabs_ethereumapis//eth/v1:go_default_library", "@com_github_prysmaticlabs_go_bitfield//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", ], diff --git a/beacon-chain/p2p/peers/status.go b/beacon-chain/p2p/peers/status.go index a49cbe2965..65d5168c5c 100644 --- a/beacon-chain/p2p/peers/status.go +++ b/beacon-chain/p2p/peers/status.go @@ -55,10 +55,13 @@ const ( const ( // ColocationLimit restricts how many peer identities we can see from a single ip or ipv6 subnet. - ColocationLimit = 5 + ColocationLimit = 7 // Additional buffer beyond current peer limit, from which we can store the relevant peer statuses. maxLimitBuffer = 150 + + // InboundRatio is the proportion of our connected peer limit at which we will allow inbound peers. + InboundRatio = float64(2) / 5 ) // Status is the structure holding the peer status information. @@ -188,6 +191,22 @@ func (p *Status) IsActive(pid peer.ID) bool { return ok && (peerData.ConnState == PeerConnected || peerData.ConnState == PeerConnecting) } +// IsAboveInboundLimit checks if we are above our current inbound +// peer limit. +func (p *Status) IsAboveInboundLimit() bool { + p.store.RLock() + defer p.store.RUnlock() + totalInbound := 0 + for _, peerData := range p.store.Peers() { + if peerData.ConnState == PeerConnected && + peerData.Direction == network.DirInbound { + totalInbound += 1 + } + } + inboundLimit := int(float64(p.ConnectedPeerLimit()) * InboundRatio) + return totalInbound > inboundLimit +} + // SetMetadata sets the metadata of the given remote peer. func (p *Status) SetMetadata(pid peer.ID, metaData *pb.MetaData) { p.store.Lock() @@ -582,6 +601,16 @@ func (p *Status) HighestEpoch() uint64 { return helpers.SlotToEpoch(highestSlot) } +// ConnectedPeerLimit returns the peer limit of +// concurrent peers connected to the beacon-node. +func (p *Status) ConnectedPeerLimit() uint64 { + maxLim := p.MaxPeerLimit() + if maxLim <= maxLimitBuffer { + return 0 + } + return uint64(maxLim) - maxLimitBuffer +} + func (p *Status) isfromBadIP(pid peer.ID) bool { p.store.RLock() defer p.store.RUnlock() diff --git a/beacon-chain/p2p/peers/status_test.go b/beacon-chain/p2p/peers/status_test.go index 03b55b6474..596f4a490e 100644 --- a/beacon-chain/p2p/peers/status_test.go +++ b/beacon-chain/p2p/peers/status_test.go @@ -11,6 +11,7 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1" "github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata" @@ -537,7 +538,7 @@ func TestPeerIPTracker(t *testing.T) { if err != nil { t.Fatal(err) } - badPeers = append(badPeers, createPeer(t, p, addr)) + badPeers = append(badPeers, createPeer(t, p, addr, network.DirUnknown, peerdata.PeerConnectionState(ethpb.ConnectionState_DISCONNECTED))) } for _, pr := range badPeers { assert.Equal(t, true, p.IsBad(pr), "peer with bad ip is not bad") @@ -547,7 +548,7 @@ func TestPeerIPTracker(t *testing.T) { // from the peer store. for i := 0; i < p.MaxPeerLimit()+100; i++ { // Peer added to peer handler. - pid := addPeer(t, p, peers.PeerConnected) + pid := addPeer(t, p, peers.PeerDisconnected) p.Scorers().BadResponsesScorer().Increment(pid) } p.Prune() @@ -623,6 +624,39 @@ func TestTrimmedOrderedPeers(t *testing.T) { assert.Equal(t, pid1, pids[2], "Incorrect third peer") } +func TestConcurrentPeerLimitHolds(t *testing.T) { + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &scorers.Config{ + BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{ + Threshold: 1, + }, + }, + }) + assert.Equal(t, true, uint64(p.MaxPeerLimit()) > p.ConnectedPeerLimit(), "max peer limit doesnt exceed connected peer limit") +} + +func TestAtInboundPeerLimit(t *testing.T) { + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &scorers.Config{ + BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{ + Threshold: 1, + }, + }, + }) + for i := 0; i < 15; i++ { + // Peer added to peer handler. + createPeer(t, p, nil, network.DirOutbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) + } + assert.Equal(t, false, p.IsAboveInboundLimit(), "Inbound limit exceeded") + for i := 0; i < 15; i++ { + // Peer added to peer handler. + createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) + } + assert.Equal(t, true, p.IsAboveInboundLimit(), "Inbound limit not exceeded") +} + func TestStatus_BestPeer(t *testing.T) { type peerConfig struct { headSlot uint64 @@ -874,7 +908,8 @@ func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState) return id } -func createPeer(t *testing.T, p *peers.Status, addr ma.Multiaddr) peer.ID { +func createPeer(t *testing.T, p *peers.Status, addr ma.Multiaddr, + dir network.Direction, state peerdata.PeerConnectionState) peer.ID { mhBytes := []byte{0x11, 0x04} idBytes := make([]byte, 4) _, err := rand.Read(idBytes) @@ -882,6 +917,7 @@ func createPeer(t *testing.T, p *peers.Status, addr ma.Multiaddr) peer.ID { mhBytes = append(mhBytes, idBytes...) id, err := peer.IDFromBytes(mhBytes) require.NoError(t, err) - p.Add(new(enr.Record), id, addr, network.DirUnknown) + p.Add(new(enr.Record), id, addr, dir) + p.SetConnectionState(id, state) return id }