mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 21:38:05 -05:00
Add Buffer For Inbound Peers (#8018)
* allow inbound peers * comment * gaz * gaz * Update deps.bzl Co-authored-by: Shay Zluf <thezluf@gmail.com> * add test * re-arrange imports * fix up * fix tests * gaz * Update beacon-chain/p2p/service_test.go * fmt Co-authored-by: Victor Farazdagi <simple.square@gmail.com> Co-authored-by: Shay Zluf <thezluf@gmail.com> Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
This commit is contained in:
@@ -80,7 +80,6 @@ go_library(
|
||||
"@com_github_libp2p_go_libp2p_noise//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_secio//:go_default_library",
|
||||
"@com_github_multiformats_go_multiaddr//:go_default_library",
|
||||
"@com_github_multiformats_go_multiaddr_net//:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
@@ -123,6 +122,7 @@ go_test(
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/p2p/encoder:go_default_library",
|
||||
"//beacon-chain/p2p/peers:go_default_library",
|
||||
"//beacon-chain/p2p/peers/peerdata:go_default_library",
|
||||
"//beacon-chain/p2p/peers/scorers:go_default_library",
|
||||
"//beacon-chain/p2p/testing:go_default_library",
|
||||
"//beacon-chain/p2p/types:go_default_library",
|
||||
@@ -151,6 +151,7 @@ go_test(
|
||||
"@com_github_libp2p_go_libp2p_core//host:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_noise//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_swarm//testing:go_default_library",
|
||||
|
||||
@@ -13,11 +13,17 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// limit for rate limiter when processing new inbound dials.
|
||||
const ipLimit = 4
|
||||
const (
|
||||
// Limit for rate limiter when processing new inbound dials.
|
||||
ipLimit = 4
|
||||
|
||||
// burst limit for inbound dials.
|
||||
const ipBurst = 8
|
||||
// Burst limit for inbound dials.
|
||||
ipBurst = 8
|
||||
|
||||
// High watermark buffer signifies the buffer till which
|
||||
// we will handle inbound requests.
|
||||
highWatermarkBuffer = 10
|
||||
)
|
||||
|
||||
// InterceptPeerDial tests whether we're permitted to Dial the specified peer.
|
||||
func (s *Service) InterceptPeerDial(_ peer.ID) (allow bool) {
|
||||
@@ -44,7 +50,7 @@ func (s *Service) InterceptAccept(n network.ConnMultiaddrs) (allow bool) {
|
||||
"reason": "exceeded dial limit"}).Trace("Not accepting inbound dial from ip address")
|
||||
return false
|
||||
}
|
||||
if s.isPeerAtLimit() {
|
||||
if s.isPeerAtLimit(true /* inbound */) {
|
||||
log.WithFields(logrus.Fields{"peer": n.RemoteMultiaddr(),
|
||||
"reason": "at peer limit"}).Trace("Not accepting inbound dial")
|
||||
return false
|
||||
|
||||
@@ -44,6 +44,10 @@ func TestPeer_AtMaxLimit(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
for i := 0; i < highWatermarkBuffer; i++ {
|
||||
addPeer(t, s.peers, peers.PeerConnected)
|
||||
}
|
||||
|
||||
// create alternate host
|
||||
listen, err = multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr2, 3000))
|
||||
require.NoError(t, err, "Failed to p2p listen")
|
||||
|
||||
@@ -68,7 +68,7 @@ func (s *Service) listenForNewNodes() {
|
||||
if s.ctx.Err() != nil {
|
||||
break
|
||||
}
|
||||
if s.isPeerAtLimit() {
|
||||
if s.isPeerAtLimit(false /* inbound */) {
|
||||
// Pause the main loop for a period to stop looking
|
||||
// for new peers.
|
||||
log.Trace("Not looking for peers, at peer limit")
|
||||
@@ -276,9 +276,14 @@ func (s *Service) filterPeer(node *enode.Node) bool {
|
||||
// This checks our set max peers in our config, and
|
||||
// determines whether our currently connected and
|
||||
// active peers are above our set max peer limit.
|
||||
func (s *Service) isPeerAtLimit() bool {
|
||||
func (s *Service) isPeerAtLimit(inbound bool) bool {
|
||||
numOfConns := len(s.host.Network().Peers())
|
||||
maxPeers := int(s.cfg.MaxPeers)
|
||||
// If we are measuring the limit for inbound peers
|
||||
// we apply the high watermark buffer.
|
||||
if inbound {
|
||||
maxPeers += highWatermarkBuffer
|
||||
}
|
||||
activePeers := len(s.Peers().Active())
|
||||
|
||||
return activePeers >= maxPeers || numOfConns >= maxPeers
|
||||
|
||||
@@ -14,10 +14,21 @@ import (
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/kevinms/leakybucket-go"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
|
||||
testp2p "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/iputils"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
|
||||
@@ -226,3 +237,48 @@ func TestHostIsResolved(t *testing.T) {
|
||||
newIP := list.Self().IP()
|
||||
assert.Equal(t, exampleIP, newIP.String(), "Did not resolve to expected IP")
|
||||
}
|
||||
|
||||
func TestInboundPeerLimit(t *testing.T) {
|
||||
fakePeer := testp2p.NewTestP2P(t)
|
||||
s := &Service{
|
||||
cfg: &Config{MaxPeers: 30},
|
||||
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
|
||||
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
ScorerParams: &scorers.Config{},
|
||||
}),
|
||||
host: fakePeer.BHost,
|
||||
}
|
||||
|
||||
for i := 0; i < 30; i++ {
|
||||
_ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
|
||||
}
|
||||
|
||||
require.Equal(t, true, s.isPeerAtLimit(false), "not at limit for outbound peers")
|
||||
require.Equal(t, false, s.isPeerAtLimit(true), "at limit for inbound peers")
|
||||
|
||||
for i := 0; i < highWatermarkBuffer; i++ {
|
||||
_ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
|
||||
}
|
||||
|
||||
require.Equal(t, true, s.isPeerAtLimit(true), "not at limit for inbound peers")
|
||||
}
|
||||
|
||||
// addPeer is a helper to add a peer with a given connection state)
|
||||
func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState) peer.ID {
|
||||
// Set up some peers with different states
|
||||
mhBytes := []byte{0x11, 0x04}
|
||||
idBytes := make([]byte, 4)
|
||||
_, err := rand.Read(idBytes)
|
||||
require.NoError(t, err)
|
||||
mhBytes = append(mhBytes, idBytes...)
|
||||
id, err := peer.IDFromBytes(mhBytes)
|
||||
require.NoError(t, err)
|
||||
p.Add(new(enr.Record), id, nil, network.DirUnknown)
|
||||
p.SetConnectionState(id, state)
|
||||
p.SetMetadata(id, &pb.MetaData{
|
||||
SeqNumber: 0,
|
||||
Attnets: bitfield.NewBitvector64(),
|
||||
})
|
||||
return id
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
noise "github.com/libp2p/go-libp2p-noise"
|
||||
secio "github.com/libp2p/go-libp2p-secio"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/shared/version"
|
||||
@@ -37,7 +36,7 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) []libp2p.Opt
|
||||
libp2p.ConnectionGater(s),
|
||||
}
|
||||
|
||||
options = append(options, libp2p.Security(noise.ID, noise.New), libp2p.Security(secio.ID, secio.New))
|
||||
options = append(options, libp2p.Security(noise.ID, noise.New))
|
||||
|
||||
if cfg.EnableUPnP {
|
||||
options = append(options, libp2p.NATPortMap()) //Allow to use UPnP
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
noise "github.com/libp2p/go-libp2p-noise"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
@@ -72,7 +73,7 @@ func createHost(t *testing.T, port int) (host.Host, *ecdsa.PrivateKey, net.IP) {
|
||||
ipAddr := net.ParseIP("127.0.0.1")
|
||||
listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, port))
|
||||
require.NoError(t, err, "Failed to p2p listen")
|
||||
h, err := libp2p.New(context.Background(), []libp2p.Option{privKeyOption(pkey), libp2p.ListenAddrs(listen)}...)
|
||||
h, err := libp2p.New(context.Background(), []libp2p.Option{privKeyOption(pkey), libp2p.ListenAddrs(listen), libp2p.Security(noise.ID, noise.New)}...)
|
||||
require.NoError(t, err)
|
||||
return h, pkey, ipAddr
|
||||
}
|
||||
|
||||
7
deps.bzl
7
deps.bzl
@@ -799,13 +799,6 @@ def prysm_deps():
|
||||
sum = "h1:RYi2hDdss1u4YE7GwixGzWwVo47T8UQwnTLB6vQiq+o=",
|
||||
version = "v2.1.0+incompatible",
|
||||
)
|
||||
|
||||
go_repository(
|
||||
name = "com_github_gofrs_flock",
|
||||
importpath = "github.com/gofrs/flock",
|
||||
sum = "h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc=",
|
||||
version = "v0.7.1",
|
||||
)
|
||||
go_repository(
|
||||
name = "com_github_golang_freetype",
|
||||
importpath = "github.com/golang/freetype",
|
||||
|
||||
1
go.mod
1
go.mod
@@ -60,7 +60,6 @@ require (
|
||||
github.com/libp2p/go-libp2p-core v0.6.1
|
||||
github.com/libp2p/go-libp2p-noise v0.1.1
|
||||
github.com/libp2p/go-libp2p-pubsub v0.3.6
|
||||
github.com/libp2p/go-libp2p-secio v0.2.2
|
||||
github.com/libp2p/go-libp2p-swarm v0.2.8
|
||||
github.com/libp2p/go-libp2p-tls v0.1.4-0.20200421131144-8a8ad624a291 // indirect
|
||||
github.com/libp2p/go-mplex v0.1.3 // indirect
|
||||
|
||||
Reference in New Issue
Block a user