diff --git a/beacon-chain/p2p/connection_gater.go b/beacon-chain/p2p/connection_gater.go index 896e2b95e7..21f728d732 100644 --- a/beacon-chain/p2p/connection_gater.go +++ b/beacon-chain/p2p/connection_gater.go @@ -78,7 +78,7 @@ func (s *Service) validateDial(addr multiaddr.Multiaddr) bool { return false } s.ipLimiter.Add(ip.String(), 1) - return !s.peers.IsAboveInboundLimit() + return true } // configureFilter looks at the provided allow lists and diff --git a/beacon-chain/p2p/connection_gater_test.go b/beacon-chain/p2p/connection_gater_test.go index 11e4632007..0af2bccae9 100644 --- a/beacon-chain/p2p/connection_gater_test.go +++ b/beacon-chain/p2p/connection_gater_test.go @@ -9,10 +9,12 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/peer" "github.com/multiformats/go-multiaddr" + ma "github.com/multiformats/go-multiaddr" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers" + mockp2p "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" "github.com/prysmaticlabs/prysm/shared/testutil/assert" "github.com/prysmaticlabs/prysm/shared/testutil/require" ) @@ -102,6 +104,8 @@ func TestService_RejectInboundPeersBeyondLimit(t *testing.T) { PeerLimit: limit, ScorerParams: &scorers.Config{}, }), + host: mockp2p.NewTestP2P(t).BHost, + cfg: &Config{MaxPeers: uint(limit)}, } var err error s.addrFilter, err = configureFilter(&Config{}) @@ -110,19 +114,20 @@ func TestService_RejectInboundPeersBeyondLimit(t *testing.T) { multiAddress, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ip, 3000)) require.NoError(t, err) - valid := s.validateDial(multiAddress) + valid := s.InterceptAccept(&maEndpoints{raddr: multiAddress}) if !valid { t.Errorf("Expected multiaddress with ip %s to be accepted as it is below the inbound limit", ip) } inboundLimit := float64(limit) * peers.InboundRatio + inboundLimit += highWatermarkBuffer // top off by 1 to trigger it above the limit. inboundLimit += 1 // Add in up to inbound peer limit. for i := 0; i < int(inboundLimit); i++ { addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) } - valid = s.validateDial(multiAddress) + valid = s.InterceptAccept(&maEndpoints{raddr: multiAddress}) if valid { t.Errorf("Expected multiaddress with ip %s to be rejected as it exceeds the inbound limit", ip) } @@ -285,3 +290,21 @@ func TestService_InterceptAddrDial_Allow(t *testing.T) { t.Errorf("Expected multiaddress with ip %s to not be rejected with an allow cidr mask of %s", ip, cidr) } } + +// Mock type for testing. +type maEndpoints struct { + laddr ma.Multiaddr + raddr ma.Multiaddr +} + +// LocalMultiaddr returns the local address associated with +// this connection +func (c *maEndpoints) LocalMultiaddr() ma.Multiaddr { + return c.laddr +} + +// RemoteMultiaddr returns the remote address associated with +// this connection +func (c *maEndpoints) RemoteMultiaddr() ma.Multiaddr { + return c.raddr +} diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 309432166a..84d58f67df 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -288,9 +288,14 @@ func (s *Service) isPeerAtLimit(inbound bool) bool { // we apply the high watermark buffer. if inbound { maxPeers += highWatermarkBuffer + maxInbound := s.peers.InboundLimit() + highWatermarkBuffer + currInbound := len(s.peers.Inbound()) + // Exit early if we are at the inbound limit. + if currInbound >= maxInbound { + return true + } } activePeers := len(s.Peers().Active()) - return activePeers >= maxPeers || numOfConns >= maxPeers } diff --git a/beacon-chain/p2p/peers/status.go b/beacon-chain/p2p/peers/status.go index 2c165f71ad..2684eecab0 100644 --- a/beacon-chain/p2p/peers/status.go +++ b/beacon-chain/p2p/peers/status.go @@ -62,7 +62,7 @@ const ( maxLimitBuffer = 150 // InboundRatio is the proportion of our connected peer limit at which we will allow inbound peers. - InboundRatio = float64(1) + InboundRatio = float64(0.8) ) // Status is the structure holding the peer status information. @@ -208,6 +208,14 @@ func (p *Status) IsAboveInboundLimit() bool { return totalInbound > inboundLimit } +// InboundLimit returns the current inbound +// peer limit. +func (p *Status) InboundLimit() int { + p.store.RLock() + defer p.store.RUnlock() + return int(float64(p.ConnectedPeerLimit()) * InboundRatio) +} + // SetMetadata sets the metadata of the given remote peer. func (p *Status) SetMetadata(pid peer.ID, metaData *pb.MetaData) { p.store.Lock() @@ -662,7 +670,7 @@ func (p *Status) PeersToPrune() []peer.ID { badResp int } peersToPrune := make([]*peerResp, 0) - // Select disconnected peers with a smaller bad response count. + // Select connected and inbound peers to prune. for pid, peerData := range p.store.Peers() { if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound { peersToPrune = append(peersToPrune, &peerResp{