Compare commits

...

1 Commits

Author SHA1 Message Date
Kasey Kirkham
90599db43e decouple req/resp limits; don't use block limit 2024-01-27 08:49:17 -06:00
9 changed files with 35 additions and 19 deletions

View File

@@ -93,6 +93,7 @@ type blocksFetcher struct {
db db.ReadOnlyDatabase
blocksPerPeriod uint64
rateLimiter *leakybucket.Collector
blobRateLimiter *leakybucket.Collector
peerLocks map[peer.ID]*peerLock
fetchRequests chan *fetchRequestParams
fetchResponses chan *fetchRequestResponse
@@ -127,11 +128,16 @@ type fetchRequestResponse struct {
// newBlocksFetcher creates ready to use fetcher.
func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetcher {
blocksPerPeriod := flags.Get().BlockBatchLimit
blobsPerPeriod := flags.Get().BlobBatchLimit
allowedBlocksBurst := flags.Get().BlockBatchLimitBurstFactor * flags.Get().BlockBatchLimit
allowedBlobsBurst := flags.Get().BlobBatchLimitBurstFactor * flags.Get().BlobBatchLimit
// Allow fetcher to go almost to the full burst capacity (less a single batch).
rateLimiter := leakybucket.NewCollector(
float64(blocksPerPeriod), int64(allowedBlocksBurst-blocksPerPeriod),
blockLimiterPeriod, false /* deleteEmptyBuckets */)
blobRateLimiter := leakybucket.NewCollector(
float64(blobsPerPeriod), int64(allowedBlobsBurst-blobsPerPeriod),
blockLimiterPeriod, false)
capacityWeight := cfg.peerFilterCapacityWeight
if capacityWeight >= 1 {
@@ -150,6 +156,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
db: cfg.db,
blocksPerPeriod: uint64(blocksPerPeriod),
rateLimiter: rateLimiter,
blobRateLimiter: blobRateLimiter,
peerLocks: make(map[peer.ID]*peerLock),
fetchRequests: make(chan *fetchRequestParams, maxPendingRequests),
fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests),
@@ -493,13 +500,13 @@ func (f *blocksFetcher) requestBlocks(
"capacity": f.rateLimiter.Remaining(pid.String()),
"score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(pid),
}).Debug("Requesting blocks")
if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) {
if err := f.waitForBandwidth(pid, req.Count); err != nil {
if f.blobRateLimiter.Remaining(pid.String()) < int64(req.Count) {
if err := f.waitForBandwidth(pid, req.Count, f.blobRateLimiter); err != nil {
l.Unlock()
return nil, err
}
}
f.rateLimiter.Add(pid.String(), int64(req.Count))
f.blobRateLimiter.Add(pid.String(), int64(req.Count))
l.Unlock()
return prysmsync.SendBeaconBlocksByRangeRequest(ctx, f.chain, f.p2p, pid, req, nil)
}
@@ -514,19 +521,19 @@ func (f *blocksFetcher) requestBlobs(ctx context.Context, req *p2ppb.BlobSidecar
"peer": pid,
"start": req.StartSlot,
"count": req.Count,
"capacity": f.rateLimiter.Remaining(pid.String()),
"capacity": f.blobRateLimiter.Remaining(pid.String()),
"score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(pid),
}).Debug("Requesting blobs")
// We're intentionally abusing the block rate limit here, treating blob requests as if they were block requests.
// Since blob requests take more bandwidth than blocks, we should improve how we account for the different kinds
// of requests, more in proportion to the cost of serving them.
if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) {
if err := f.waitForBandwidth(pid, req.Count); err != nil {
if f.blobRateLimiter.Remaining(pid.String()) < int64(req.Count) {
if err := f.waitForBandwidth(pid, req.Count, f.blobRateLimiter); err != nil {
l.Unlock()
return nil, err
}
}
f.rateLimiter.Add(pid.String(), int64(req.Count))
f.blobRateLimiter.Add(pid.String(), int64(req.Count))
l.Unlock()
return prysmsync.SendBlobsByRangeRequest(ctx, f.clock, f.p2p, pid, f.ctxMap, req)
}
@@ -549,7 +556,7 @@ func (f *blocksFetcher) requestBlocksByRoot(
"score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(pid),
}).Debug("Requesting blocks (by roots)")
if f.rateLimiter.Remaining(pid.String()) < int64(len(*req)) {
if err := f.waitForBandwidth(pid, uint64(len(*req))); err != nil {
if err := f.waitForBandwidth(pid, uint64(len(*req)), f.rateLimiter); err != nil {
l.Unlock()
return nil, err
}
@@ -561,9 +568,9 @@ func (f *blocksFetcher) requestBlocksByRoot(
}
// waitForBandwidth blocks up until peer's bandwidth is restored.
func (f *blocksFetcher) waitForBandwidth(pid peer.ID, count uint64) error {
func (f *blocksFetcher) waitForBandwidth(pid peer.ID, count uint64, collector *leakybucket.Collector) error {
log.WithField("peer", pid).Debug("Slowing down for rate limit")
rem := f.rateLimiter.Remaining(pid.String())
rem := collector.Remaining(pid.String())
if uint64(rem) >= count {
// Exit early if we have sufficient capacity
return nil
@@ -572,7 +579,7 @@ func (f *blocksFetcher) waitForBandwidth(pid peer.ID, count uint64) error {
if err != nil {
return err
}
toWait := timeToWait(int64(intCount), rem, f.rateLimiter.Capacity(), f.rateLimiter.TillEmpty(pid.String()))
toWait := timeToWait(int64(intCount), rem, collector.Capacity(), f.rateLimiter.TillEmpty(pid.String()))
timer := time.NewTimer(toWait)
defer timer.Stop()
select {

View File

@@ -626,12 +626,12 @@ func TestBlocksFetcher_WaitForBandwidth(t *testing.T) {
vr := [32]byte{}
fetcher.chain = &mock.ChainService{Genesis: gt, ValidatorsRoot: vr}
start := time.Now()
assert.NoError(t, fetcher.waitForBandwidth(p2.PeerID(), 10))
assert.NoError(t, fetcher.waitForBandwidth(p2.PeerID(), 10, fetcher.rateLimiter))
dur := time.Since(start)
assert.Equal(t, true, dur < time.Millisecond, "waited excessively for bandwidth")
fetcher.rateLimiter.Add(p2.PeerID().String(), int64(req.Count*burstFactor))
start = time.Now()
assert.NoError(t, fetcher.waitForBandwidth(p2.PeerID(), req.Count))
assert.NoError(t, fetcher.waitForBandwidth(p2.PeerID(), req.Count, fetcher.rateLimiter))
dur = time.Since(start)
assert.Equal(t, float64(5), dur.Truncate(1*time.Second).Seconds(), "waited excessively for bandwidth")
}

View File

@@ -43,8 +43,8 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {
allowedBlocksBurst := int64(flags.Get().BlockBatchLimitBurstFactor * flags.Get().BlockBatchLimit)
// Initialize blob limits.
allowedBlobsPerSecond := float64(flags.Get().BlobBatchLimit)
allowedBlobsBurst := int64(flags.Get().BlobBatchLimitBurstFactor * flags.Get().BlobBatchLimit)
allowedBlobsPerSecond := float64(flags.Get().BlobServingBatchLimit)
allowedBlobsBurst := int64(flags.Get().BlobBatchLimitBurstFactor * flags.Get().BlobServingBatchLimit)
// Set topic map for all rpc topics.
topicMap := make(map[string]*leakybucket.Collector, len(p2p.RPCTopicMappings))

View File

@@ -58,7 +58,7 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface
// Sort the identifiers so that requests for the same blob root will be adjacent, minimizing db lookups.
sort.Sort(blobIdents)
batchSize := flags.Get().BlobBatchLimit
batchSize := flags.Get().BlobServingBatchLimit
var ticker *time.Ticker
if len(blobIdents) > batchSize {
ticker = time.NewTicker(time.Second)

View File

@@ -167,11 +167,16 @@ var (
Usage: "The factor by which block batch limit may increase on burst.",
Value: 2,
}
BlobServingBatchLimit = &cli.IntFlag{
Name: "blob-batch-limit",
Usage: "Maximum number of blob slots for the server to allow in a batch.",
Value: 64,
}
// BlobBatchLimit specifies the requested blob batch size.
BlobBatchLimit = &cli.IntFlag{
Name: "blob-batch-limit",
Usage: "The amount of blobs the local peer is bounded to request and respond to in a batch.",
Value: 64,
Usage: "Max number of blob slots to request in a batch.",
Value: 16,
}
// BlobBatchLimitBurstFactor specifies the factor by which blob batch size may increase.
BlobBatchLimitBurstFactor = &cli.IntFlag{

View File

@@ -14,6 +14,7 @@ type GlobalFlags struct {
BlockBatchLimit int
BlockBatchLimitBurstFactor int
BlobBatchLimit int
BlobServingBatchLimit int
BlobBatchLimitBurstFactor int
}
@@ -43,6 +44,7 @@ func ConfigureGlobalFlags(ctx *cli.Context) {
cfg.BlockBatchLimit = ctx.Int(BlockBatchLimit.Name)
cfg.BlockBatchLimitBurstFactor = ctx.Int(BlockBatchLimitBurstFactor.Name)
cfg.BlobBatchLimit = ctx.Int(BlobBatchLimit.Name)
cfg.BlobServingBatchLimit = ctx.Int(BlobServingBatchLimit.Name)
cfg.BlobBatchLimitBurstFactor = ctx.Int(BlobBatchLimitBurstFactor.Name)
cfg.MinimumPeersPerSubnet = ctx.Int(MinPeersPerSubnet.Name)
configureMinimumPeers(ctx, cfg)

View File

@@ -57,7 +57,7 @@ var appFlags = []cli.Flag{
flags.SetGCPercent,
flags.BlockBatchLimit,
flags.BlockBatchLimitBurstFactor,
flags.BlobBatchLimit,
flags.BlobServingBatchLimit,
flags.BlobBatchLimitBurstFactor,
flags.InteropMockEth1DataVotesFlag,
flags.InteropNumValidatorsFlag,

View File

@@ -114,6 +114,7 @@ var appHelpFlagGroups = []flagGroup{
flags.BlockBatchLimit,
flags.BlockBatchLimitBurstFactor,
flags.BlobBatchLimit,
flags.BlobServingBatchLimit,
flags.BlobBatchLimitBurstFactor,
flags.EnableDebugRPCEndpoints,
flags.SubscribeToAllSubnets,

View File

@@ -269,6 +269,7 @@ func (node *BeaconNode) Start(ctx context.Context) error {
fmt.Sprintf("--%s=%d", flags.BlockBatchLimitBurstFactor.Name, 8),
fmt.Sprintf("--%s=%d", flags.BlobBatchLimitBurstFactor.Name, 16),
fmt.Sprintf("--%s=%d", flags.BlobBatchLimit.Name, 256),
fmt.Sprintf("--%s=%d", flags.BlobServingBatchLimit.Name, 256),
fmt.Sprintf("--%s=%s", cmdshared.ChainConfigFileFlag.Name, cfgPath),
"--" + cmdshared.ValidatorMonitorIndicesFlag.Name + "=1",
"--" + cmdshared.ValidatorMonitorIndicesFlag.Name + "=2",