Rate limiter for rpc beacon blocks (#4549)

* use rate limiter for rpc beacon blocks

* gofmt

* don't delete empty buckets

* disconnect bad peers

* tell peer they are being rate limited

* defer disconnect

* fix tests

* set burst to x10

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Preston Van Loon
2020-01-15 17:19:06 -08:00
committed by GitHub
parent 452cadc286
commit 129bc763ee
8 changed files with 61 additions and 2 deletions

View File

@@ -1473,3 +1473,10 @@ go_repository(
commit = "d7df74196a9e781ede915320c11c378c1b2f3a1f",
importpath = "github.com/cespare/xxhash",
)
go_repository(
name = "com_github_kevinms_leakybucket_go",
importpath = "github.com/kevinms/leakybucket-go",
sum = "h1:oq6BiN7v0MfWCRcJAxSV+hesVMAAV8COrQbTjYNnso4=",
version = "v0.0.0-20190611015032-8a3d0352aa79",
)

View File

@@ -56,6 +56,7 @@ go_library(
"//shared/slotutil:go_default_library",
"//shared/traceutil:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p_core//:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
@@ -115,6 +116,7 @@ go_test(
"//shared/params:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p_core//:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//protocol:go_default_library",

View File

@@ -9,6 +9,7 @@ import (
)
const genericError = "internal service error"
const rateLimitedError = "rate limited"
var errWrongForkVersion = errors.New("wrong fork version")
var errInvalidEpoch = errors.New("invalid epoch")

View File

@@ -27,6 +27,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
startSlot := m.StartSlot
endSlot := startSlot + (m.Step * (m.Count - 1))
remainingBucketCapacity := r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String())
span.AddAttributes(
trace.Int64Attribute("start", int64(startSlot)),
@@ -34,8 +35,28 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
trace.Int64Attribute("step", int64(m.Step)),
trace.Int64Attribute("count", int64(m.Count)),
trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty()),
trace.Int64Attribute("remaining_capacity", remainingBucketCapacity),
)
if m.Count > uint64(remainingBucketCapacity) {
r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) {
log.Debug("Disconnecting bad peer")
defer r.p2p.Disconnect(stream.Conn().RemotePeer())
}
resp, err := r.generateErrorResponse(responseCodeInvalidRequest, rateLimitedError)
if err != nil {
log.WithError(err).Error("Failed to generate a response error")
} else {
if _, err := stream.Write(resp); err != nil {
log.WithError(err).Errorf("Failed to write to stream")
}
}
return errors.New(rateLimitedError)
}
r.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(m.Count))
// TODO(3147): Update this with reasonable constraints.
if endSlot-startSlot > 1000 || m.Step == 0 {
resp, err := r.generateErrorResponse(responseCodeInvalidRequest, "invalid range or step")

View File

@@ -6,6 +6,7 @@ import (
"testing"
"time"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/protocol"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
@@ -38,7 +39,7 @@ func TestBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) {
}
}
r := &Service{p2p: p1, db: d}
r := &Service{p2p: p1, db: d, blocksRateLimiter: leakybucket.NewCollector(10000, 10000, false)}
pcl := protocol.ID("/testing")
var wg sync.WaitGroup

View File

@@ -64,6 +64,25 @@ func (r *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
return errors.New("no block roots provided")
}
if int64(len(blockRoots)) > r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) {
r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) {
log.Debug("Disconnecting bad peer")
defer r.p2p.Disconnect(stream.Conn().RemotePeer())
}
resp, err := r.generateErrorResponse(responseCodeInvalidRequest, rateLimitedError)
if err != nil {
log.WithError(err).Error("Failed to generate a response error")
} else {
if _, err := stream.Write(resp); err != nil {
log.WithError(err).Errorf("Failed to write to stream")
}
}
return errors.New(rateLimitedError)
}
r.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(len(blockRoots)))
for _, root := range blockRoots {
blk, err := r.db.Block(ctx, root)
if err != nil {

View File

@@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/protocol"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
@@ -45,7 +46,7 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) {
blkRoots = append(blkRoots, root)
}
r := &Service{p2p: p1, db: d}
r := &Service{p2p: p1, db: d, blocksRateLimiter: leakybucket.NewCollector(10000, 10000, false)}
pcl := protocol.ID("/testing")
var wg sync.WaitGroup
@@ -118,6 +119,7 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) {
slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
ctx: context.Background(),
blocksRateLimiter: leakybucket.NewCollector(10000, 10000, false),
}
// Setup streams

View File

@@ -4,6 +4,7 @@ import (
"context"
"sync"
"github.com/kevinms/leakybucket-go"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
@@ -16,6 +17,9 @@ import (
var _ = shared.Service(&Service{})
const allowedBlocksPerSecond = 32.0
const allowedBlocksBurst = 10 * allowedBlocksPerSecond
// Config to set up the regular sync service.
type Config struct {
P2P p2p.P2P
@@ -50,6 +54,7 @@ func NewRegularSync(cfg *Config) *Service {
slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
stateNotifier: cfg.StateNotifier,
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */),
}
r.registerRPCHandlers()
@@ -74,6 +79,7 @@ type Service struct {
initialSync Checker
validateBlockLock sync.RWMutex
stateNotifier statefeed.Notifier
blocksRateLimiter *leakybucket.Collector
}
// Start the regular sync service.