From 62a59318431649258147fedcdc2943509f280c84 Mon Sep 17 00:00:00 2001 From: Preston Van Loon Date: Thu, 23 Jan 2020 17:33:43 -0800 Subject: [PATCH] Use a client side rate limit to reduce chance of getting banned (#4637) * Use a client side rate limit to reduce chance of getting banned * fix test Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- beacon-chain/sync/initial-sync/BUILD.bazel | 2 ++ beacon-chain/sync/initial-sync/round_robin.go | 5 ++++ .../sync/initial-sync/round_robin_test.go | 12 ++++---- beacon-chain/sync/initial-sync/service.go | 29 +++++++++++-------- 4 files changed, 31 insertions(+), 17 deletions(-) diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index 3554373e45..bca26ddf14 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//shared/mathutil:go_default_library", "//shared/params:go_default_library", "//shared/roughtime:go_default_library", + "@com_github_kevinms_leakybucket_go//:go_default_library", "@com_github_libp2p_go_libp2p_core//peer:go_default_library", "@com_github_paulbellamy_ratecounter//:go_default_library", "@com_github_pkg_errors//:go_default_library", @@ -51,6 +52,7 @@ go_test( "//shared/params:go_default_library", "//shared/roughtime:go_default_library", "//shared/sliceutil:go_default_library", + "@com_github_kevinms_leakybucket_go//:go_default_library", "@com_github_libp2p_go_libp2p_core//network:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", "@com_github_prysmaticlabs_go_ssz//:go_default_library", diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index bc5b6a9be3..696c864089 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -274,6 +274,11 @@ func (s *Service) roundRobinSync(genesis time.Time) error { // requestBlocks by range to a specific peer. func (s *Service) requestBlocks(ctx context.Context, req *p2ppb.BeaconBlocksByRangeRequest, pid peer.ID) ([]*eth.SignedBeaconBlock, error) { + if s.blocksRateLimiter.Remaining(pid.String()) < int64(req.Count) { + log.WithField("peer", pid).Debug("Slowing down for rate limit") + time.Sleep(s.blocksRateLimiter.TillEmpty(pid.String())) + } + s.blocksRateLimiter.Add(pid.String(), int64(req.Count)) log.WithFields(logrus.Fields{ "peer": pid, "start": req.StartSlot, diff --git a/beacon-chain/sync/initial-sync/round_robin_test.go b/beacon-chain/sync/initial-sync/round_robin_test.go index e293ae4556..88d407cf0a 100644 --- a/beacon-chain/sync/initial-sync/round_robin_test.go +++ b/beacon-chain/sync/initial-sync/round_robin_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p-core/network" eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" @@ -255,11 +256,12 @@ func TestRoundRobinSync(t *testing.T) { DB: beaconDB, } // no-op mock s := &Service{ - chain: mc, - p2p: p, - db: beaconDB, - synced: false, - chainStarted: true, + chain: mc, + p2p: p, + db: beaconDB, + synced: false, + chainStarted: true, + blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */), } if err := s.roundRobinSync(makeGenesisTime(tt.currentSlot)); err != nil { t.Error(err) diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 9cc7c8834d..7ad058bd8d 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/kevinms/leakybucket-go" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" @@ -27,6 +28,8 @@ type blockchainService interface { const ( handshakePollingInterval = 5 * time.Second // Polling interval for checking the number of received handshakes. + + allowedBlocksPerSecond = 32.0 ) // Config to set up the initial sync service. @@ -39,24 +42,26 @@ type Config struct { // Service service. type Service struct { - ctx context.Context - chain blockchainService - p2p p2p.P2P - db db.ReadOnlyDatabase - synced bool - chainStarted bool - stateNotifier statefeed.Notifier + ctx context.Context + chain blockchainService + p2p p2p.P2P + db db.ReadOnlyDatabase + synced bool + chainStarted bool + stateNotifier statefeed.Notifier + blocksRateLimiter *leakybucket.Collector } // NewInitialSync configures the initial sync service responsible for bringing the node up to the // latest head of the blockchain. func NewInitialSync(cfg *Config) *Service { return &Service{ - ctx: context.Background(), - chain: cfg.Chain, - p2p: cfg.P2P, - db: cfg.DB, - stateNotifier: cfg.StateNotifier, + ctx: context.Background(), + chain: cfg.Chain, + p2p: cfg.P2P, + db: cfg.DB, + stateNotifier: cfg.StateNotifier, + blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */), } }