From 129bc763ee062461354ef317eb3cc42003c8f021 Mon Sep 17 00:00:00 2001 From: Preston Van Loon Date: Wed, 15 Jan 2020 17:19:06 -0800 Subject: [PATCH] Rate limiter for rpc beacon blocks (#4549) * use rate limiter for rpc beacon blocks * gofmt * don't delete empty buckets * disconnect bad peers * tell peer they are being rate limited * defer disconnect * fix tests * set burst to x10 Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- WORKSPACE | 7 +++++++ beacon-chain/sync/BUILD.bazel | 2 ++ beacon-chain/sync/error.go | 1 + .../sync/rpc_beacon_blocks_by_range.go | 21 +++++++++++++++++++ .../sync/rpc_beacon_blocks_by_range_test.go | 3 ++- .../sync/rpc_beacon_blocks_by_root.go | 19 +++++++++++++++++ .../sync/rpc_beacon_blocks_by_root_test.go | 4 +++- beacon-chain/sync/service.go | 6 ++++++ 8 files changed, 61 insertions(+), 2 deletions(-) diff --git a/WORKSPACE b/WORKSPACE index f7402fe51c..bf952e72bc 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -1473,3 +1473,10 @@ go_repository( commit = "d7df74196a9e781ede915320c11c378c1b2f3a1f", importpath = "github.com/cespare/xxhash", ) + +go_repository( + name = "com_github_kevinms_leakybucket_go", + importpath = "github.com/kevinms/leakybucket-go", + sum = "h1:oq6BiN7v0MfWCRcJAxSV+hesVMAAV8COrQbTjYNnso4=", + version = "v0.0.0-20190611015032-8a3d0352aa79", +) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 859cc776a7..b77fdea212 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -56,6 +56,7 @@ go_library( "//shared/slotutil:go_default_library", "//shared/traceutil:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", + "@com_github_kevinms_leakybucket_go//:go_default_library", "@com_github_libp2p_go_libp2p_core//:go_default_library", "@com_github_libp2p_go_libp2p_core//network:go_default_library", "@com_github_libp2p_go_libp2p_core//peer:go_default_library", @@ -115,6 +116,7 @@ go_test( "//shared/params:go_default_library", "//shared/testutil:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", + "@com_github_kevinms_leakybucket_go//:go_default_library", "@com_github_libp2p_go_libp2p_core//:go_default_library", "@com_github_libp2p_go_libp2p_core//network:go_default_library", "@com_github_libp2p_go_libp2p_core//protocol:go_default_library", diff --git a/beacon-chain/sync/error.go b/beacon-chain/sync/error.go index 82fce4465b..1621788660 100644 --- a/beacon-chain/sync/error.go +++ b/beacon-chain/sync/error.go @@ -9,6 +9,7 @@ import ( ) const genericError = "internal service error" +const rateLimitedError = "rate limited" var errWrongForkVersion = errors.New("wrong fork version") var errInvalidEpoch = errors.New("invalid epoch") diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index 72b61af783..44a1a38487 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -27,6 +27,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa startSlot := m.StartSlot endSlot := startSlot + (m.Step * (m.Count - 1)) + remainingBucketCapacity := r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) span.AddAttributes( trace.Int64Attribute("start", int64(startSlot)), @@ -34,8 +35,28 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa trace.Int64Attribute("step", int64(m.Step)), trace.Int64Attribute("count", int64(m.Count)), trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty()), + trace.Int64Attribute("remaining_capacity", remainingBucketCapacity), ) + if m.Count > uint64(remainingBucketCapacity) { + r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) { + log.Debug("Disconnecting bad peer") + defer r.p2p.Disconnect(stream.Conn().RemotePeer()) + } + resp, err := r.generateErrorResponse(responseCodeInvalidRequest, rateLimitedError) + if err != nil { + log.WithError(err).Error("Failed to generate a response error") + } else { + if _, err := stream.Write(resp); err != nil { + log.WithError(err).Errorf("Failed to write to stream") + } + } + return errors.New(rateLimitedError) + } + + r.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(m.Count)) + // TODO(3147): Update this with reasonable constraints. if endSlot-startSlot > 1000 || m.Step == 0 { resp, err := r.generateErrorResponse(responseCodeInvalidRequest, "invalid range or step") diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go index b7cb3b6f02..25a42771df 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/protocol" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" @@ -38,7 +39,7 @@ func TestBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) { } } - r := &Service{p2p: p1, db: d} + r := &Service{p2p: p1, db: d, blocksRateLimiter: leakybucket.NewCollector(10000, 10000, false)} pcl := protocol.ID("/testing") var wg sync.WaitGroup diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root.go b/beacon-chain/sync/rpc_beacon_blocks_by_root.go index d7ef6f669d..3d60de4fe1 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root.go @@ -64,6 +64,25 @@ func (r *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{ return errors.New("no block roots provided") } + if int64(len(blockRoots)) > r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) { + r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) { + log.Debug("Disconnecting bad peer") + defer r.p2p.Disconnect(stream.Conn().RemotePeer()) + } + resp, err := r.generateErrorResponse(responseCodeInvalidRequest, rateLimitedError) + if err != nil { + log.WithError(err).Error("Failed to generate a response error") + } else { + if _, err := stream.Write(resp); err != nil { + log.WithError(err).Errorf("Failed to write to stream") + } + } + return errors.New(rateLimitedError) + } + + r.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(len(blockRoots))) + for _, root := range blockRoots { blk, err := r.db.Block(ctx, root) if err != nil { diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go index 88b5b36157..da21ace4aa 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/protocol" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" @@ -45,7 +46,7 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) { blkRoots = append(blkRoots, root) } - r := &Service{p2p: p1, db: d} + r := &Service{p2p: p1, db: d, blocksRateLimiter: leakybucket.NewCollector(10000, 10000, false)} pcl := protocol.ID("/testing") var wg sync.WaitGroup @@ -118,6 +119,7 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) { slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), seenPendingBlocks: make(map[[32]byte]bool), ctx: context.Background(), + blocksRateLimiter: leakybucket.NewCollector(10000, 10000, false), } // Setup streams diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 27eb96cb6f..bfbe2a432e 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "github.com/kevinms/leakybucket-go" "github.com/pkg/errors" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" @@ -16,6 +17,9 @@ import ( var _ = shared.Service(&Service{}) +const allowedBlocksPerSecond = 32.0 +const allowedBlocksBurst = 10 * allowedBlocksPerSecond + // Config to set up the regular sync service. type Config struct { P2P p2p.P2P @@ -50,6 +54,7 @@ func NewRegularSync(cfg *Config) *Service { slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), seenPendingBlocks: make(map[[32]byte]bool), stateNotifier: cfg.StateNotifier, + blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */), } r.registerRPCHandlers() @@ -74,6 +79,7 @@ type Service struct { initialSync Checker validateBlockLock sync.RWMutex stateNotifier statefeed.Notifier + blocksRateLimiter *leakybucket.Collector } // Start the regular sync service.