mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 05:47:59 -05:00
Compare commits
1 Commits
topic-bug-
...
blob-rate-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
90599db43e |
@@ -93,6 +93,7 @@ type blocksFetcher struct {
|
||||
db db.ReadOnlyDatabase
|
||||
blocksPerPeriod uint64
|
||||
rateLimiter *leakybucket.Collector
|
||||
blobRateLimiter *leakybucket.Collector
|
||||
peerLocks map[peer.ID]*peerLock
|
||||
fetchRequests chan *fetchRequestParams
|
||||
fetchResponses chan *fetchRequestResponse
|
||||
@@ -127,11 +128,16 @@ type fetchRequestResponse struct {
|
||||
// newBlocksFetcher creates ready to use fetcher.
|
||||
func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetcher {
|
||||
blocksPerPeriod := flags.Get().BlockBatchLimit
|
||||
blobsPerPeriod := flags.Get().BlobBatchLimit
|
||||
allowedBlocksBurst := flags.Get().BlockBatchLimitBurstFactor * flags.Get().BlockBatchLimit
|
||||
allowedBlobsBurst := flags.Get().BlobBatchLimitBurstFactor * flags.Get().BlobBatchLimit
|
||||
// Allow fetcher to go almost to the full burst capacity (less a single batch).
|
||||
rateLimiter := leakybucket.NewCollector(
|
||||
float64(blocksPerPeriod), int64(allowedBlocksBurst-blocksPerPeriod),
|
||||
blockLimiterPeriod, false /* deleteEmptyBuckets */)
|
||||
blobRateLimiter := leakybucket.NewCollector(
|
||||
float64(blobsPerPeriod), int64(allowedBlobsBurst-blobsPerPeriod),
|
||||
blockLimiterPeriod, false)
|
||||
|
||||
capacityWeight := cfg.peerFilterCapacityWeight
|
||||
if capacityWeight >= 1 {
|
||||
@@ -150,6 +156,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
|
||||
db: cfg.db,
|
||||
blocksPerPeriod: uint64(blocksPerPeriod),
|
||||
rateLimiter: rateLimiter,
|
||||
blobRateLimiter: blobRateLimiter,
|
||||
peerLocks: make(map[peer.ID]*peerLock),
|
||||
fetchRequests: make(chan *fetchRequestParams, maxPendingRequests),
|
||||
fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests),
|
||||
@@ -493,13 +500,13 @@ func (f *blocksFetcher) requestBlocks(
|
||||
"capacity": f.rateLimiter.Remaining(pid.String()),
|
||||
"score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(pid),
|
||||
}).Debug("Requesting blocks")
|
||||
if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) {
|
||||
if err := f.waitForBandwidth(pid, req.Count); err != nil {
|
||||
if f.blobRateLimiter.Remaining(pid.String()) < int64(req.Count) {
|
||||
if err := f.waitForBandwidth(pid, req.Count, f.blobRateLimiter); err != nil {
|
||||
l.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
f.rateLimiter.Add(pid.String(), int64(req.Count))
|
||||
f.blobRateLimiter.Add(pid.String(), int64(req.Count))
|
||||
l.Unlock()
|
||||
return prysmsync.SendBeaconBlocksByRangeRequest(ctx, f.chain, f.p2p, pid, req, nil)
|
||||
}
|
||||
@@ -514,19 +521,19 @@ func (f *blocksFetcher) requestBlobs(ctx context.Context, req *p2ppb.BlobSidecar
|
||||
"peer": pid,
|
||||
"start": req.StartSlot,
|
||||
"count": req.Count,
|
||||
"capacity": f.rateLimiter.Remaining(pid.String()),
|
||||
"capacity": f.blobRateLimiter.Remaining(pid.String()),
|
||||
"score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(pid),
|
||||
}).Debug("Requesting blobs")
|
||||
// We're intentionally abusing the block rate limit here, treating blob requests as if they were block requests.
|
||||
// Since blob requests take more bandwidth than blocks, we should improve how we account for the different kinds
|
||||
// of requests, more in proportion to the cost of serving them.
|
||||
if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) {
|
||||
if err := f.waitForBandwidth(pid, req.Count); err != nil {
|
||||
if f.blobRateLimiter.Remaining(pid.String()) < int64(req.Count) {
|
||||
if err := f.waitForBandwidth(pid, req.Count, f.blobRateLimiter); err != nil {
|
||||
l.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
f.rateLimiter.Add(pid.String(), int64(req.Count))
|
||||
f.blobRateLimiter.Add(pid.String(), int64(req.Count))
|
||||
l.Unlock()
|
||||
return prysmsync.SendBlobsByRangeRequest(ctx, f.clock, f.p2p, pid, f.ctxMap, req)
|
||||
}
|
||||
@@ -549,7 +556,7 @@ func (f *blocksFetcher) requestBlocksByRoot(
|
||||
"score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(pid),
|
||||
}).Debug("Requesting blocks (by roots)")
|
||||
if f.rateLimiter.Remaining(pid.String()) < int64(len(*req)) {
|
||||
if err := f.waitForBandwidth(pid, uint64(len(*req))); err != nil {
|
||||
if err := f.waitForBandwidth(pid, uint64(len(*req)), f.rateLimiter); err != nil {
|
||||
l.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
@@ -561,9 +568,9 @@ func (f *blocksFetcher) requestBlocksByRoot(
|
||||
}
|
||||
|
||||
// waitForBandwidth blocks up until peer's bandwidth is restored.
|
||||
func (f *blocksFetcher) waitForBandwidth(pid peer.ID, count uint64) error {
|
||||
func (f *blocksFetcher) waitForBandwidth(pid peer.ID, count uint64, collector *leakybucket.Collector) error {
|
||||
log.WithField("peer", pid).Debug("Slowing down for rate limit")
|
||||
rem := f.rateLimiter.Remaining(pid.String())
|
||||
rem := collector.Remaining(pid.String())
|
||||
if uint64(rem) >= count {
|
||||
// Exit early if we have sufficient capacity
|
||||
return nil
|
||||
@@ -572,7 +579,7 @@ func (f *blocksFetcher) waitForBandwidth(pid peer.ID, count uint64) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
toWait := timeToWait(int64(intCount), rem, f.rateLimiter.Capacity(), f.rateLimiter.TillEmpty(pid.String()))
|
||||
toWait := timeToWait(int64(intCount), rem, collector.Capacity(), f.rateLimiter.TillEmpty(pid.String()))
|
||||
timer := time.NewTimer(toWait)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
|
||||
@@ -626,12 +626,12 @@ func TestBlocksFetcher_WaitForBandwidth(t *testing.T) {
|
||||
vr := [32]byte{}
|
||||
fetcher.chain = &mock.ChainService{Genesis: gt, ValidatorsRoot: vr}
|
||||
start := time.Now()
|
||||
assert.NoError(t, fetcher.waitForBandwidth(p2.PeerID(), 10))
|
||||
assert.NoError(t, fetcher.waitForBandwidth(p2.PeerID(), 10, fetcher.rateLimiter))
|
||||
dur := time.Since(start)
|
||||
assert.Equal(t, true, dur < time.Millisecond, "waited excessively for bandwidth")
|
||||
fetcher.rateLimiter.Add(p2.PeerID().String(), int64(req.Count*burstFactor))
|
||||
start = time.Now()
|
||||
assert.NoError(t, fetcher.waitForBandwidth(p2.PeerID(), req.Count))
|
||||
assert.NoError(t, fetcher.waitForBandwidth(p2.PeerID(), req.Count, fetcher.rateLimiter))
|
||||
dur = time.Since(start)
|
||||
assert.Equal(t, float64(5), dur.Truncate(1*time.Second).Seconds(), "waited excessively for bandwidth")
|
||||
}
|
||||
|
||||
@@ -43,8 +43,8 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {
|
||||
allowedBlocksBurst := int64(flags.Get().BlockBatchLimitBurstFactor * flags.Get().BlockBatchLimit)
|
||||
|
||||
// Initialize blob limits.
|
||||
allowedBlobsPerSecond := float64(flags.Get().BlobBatchLimit)
|
||||
allowedBlobsBurst := int64(flags.Get().BlobBatchLimitBurstFactor * flags.Get().BlobBatchLimit)
|
||||
allowedBlobsPerSecond := float64(flags.Get().BlobServingBatchLimit)
|
||||
allowedBlobsBurst := int64(flags.Get().BlobBatchLimitBurstFactor * flags.Get().BlobServingBatchLimit)
|
||||
|
||||
// Set topic map for all rpc topics.
|
||||
topicMap := make(map[string]*leakybucket.Collector, len(p2p.RPCTopicMappings))
|
||||
|
||||
@@ -58,7 +58,7 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface
|
||||
// Sort the identifiers so that requests for the same blob root will be adjacent, minimizing db lookups.
|
||||
sort.Sort(blobIdents)
|
||||
|
||||
batchSize := flags.Get().BlobBatchLimit
|
||||
batchSize := flags.Get().BlobServingBatchLimit
|
||||
var ticker *time.Ticker
|
||||
if len(blobIdents) > batchSize {
|
||||
ticker = time.NewTicker(time.Second)
|
||||
|
||||
@@ -167,11 +167,16 @@ var (
|
||||
Usage: "The factor by which block batch limit may increase on burst.",
|
||||
Value: 2,
|
||||
}
|
||||
BlobServingBatchLimit = &cli.IntFlag{
|
||||
Name: "blob-batch-limit",
|
||||
Usage: "Maximum number of blob slots for the server to allow in a batch.",
|
||||
Value: 64,
|
||||
}
|
||||
// BlobBatchLimit specifies the requested blob batch size.
|
||||
BlobBatchLimit = &cli.IntFlag{
|
||||
Name: "blob-batch-limit",
|
||||
Usage: "The amount of blobs the local peer is bounded to request and respond to in a batch.",
|
||||
Value: 64,
|
||||
Usage: "Max number of blob slots to request in a batch.",
|
||||
Value: 16,
|
||||
}
|
||||
// BlobBatchLimitBurstFactor specifies the factor by which blob batch size may increase.
|
||||
BlobBatchLimitBurstFactor = &cli.IntFlag{
|
||||
|
||||
@@ -14,6 +14,7 @@ type GlobalFlags struct {
|
||||
BlockBatchLimit int
|
||||
BlockBatchLimitBurstFactor int
|
||||
BlobBatchLimit int
|
||||
BlobServingBatchLimit int
|
||||
BlobBatchLimitBurstFactor int
|
||||
}
|
||||
|
||||
@@ -43,6 +44,7 @@ func ConfigureGlobalFlags(ctx *cli.Context) {
|
||||
cfg.BlockBatchLimit = ctx.Int(BlockBatchLimit.Name)
|
||||
cfg.BlockBatchLimitBurstFactor = ctx.Int(BlockBatchLimitBurstFactor.Name)
|
||||
cfg.BlobBatchLimit = ctx.Int(BlobBatchLimit.Name)
|
||||
cfg.BlobServingBatchLimit = ctx.Int(BlobServingBatchLimit.Name)
|
||||
cfg.BlobBatchLimitBurstFactor = ctx.Int(BlobBatchLimitBurstFactor.Name)
|
||||
cfg.MinimumPeersPerSubnet = ctx.Int(MinPeersPerSubnet.Name)
|
||||
configureMinimumPeers(ctx, cfg)
|
||||
|
||||
@@ -57,7 +57,7 @@ var appFlags = []cli.Flag{
|
||||
flags.SetGCPercent,
|
||||
flags.BlockBatchLimit,
|
||||
flags.BlockBatchLimitBurstFactor,
|
||||
flags.BlobBatchLimit,
|
||||
flags.BlobServingBatchLimit,
|
||||
flags.BlobBatchLimitBurstFactor,
|
||||
flags.InteropMockEth1DataVotesFlag,
|
||||
flags.InteropNumValidatorsFlag,
|
||||
|
||||
@@ -114,6 +114,7 @@ var appHelpFlagGroups = []flagGroup{
|
||||
flags.BlockBatchLimit,
|
||||
flags.BlockBatchLimitBurstFactor,
|
||||
flags.BlobBatchLimit,
|
||||
flags.BlobServingBatchLimit,
|
||||
flags.BlobBatchLimitBurstFactor,
|
||||
flags.EnableDebugRPCEndpoints,
|
||||
flags.SubscribeToAllSubnets,
|
||||
|
||||
@@ -269,6 +269,7 @@ func (node *BeaconNode) Start(ctx context.Context) error {
|
||||
fmt.Sprintf("--%s=%d", flags.BlockBatchLimitBurstFactor.Name, 8),
|
||||
fmt.Sprintf("--%s=%d", flags.BlobBatchLimitBurstFactor.Name, 16),
|
||||
fmt.Sprintf("--%s=%d", flags.BlobBatchLimit.Name, 256),
|
||||
fmt.Sprintf("--%s=%d", flags.BlobServingBatchLimit.Name, 256),
|
||||
fmt.Sprintf("--%s=%s", cmdshared.ChainConfigFileFlag.Name, cfgPath),
|
||||
"--" + cmdshared.ValidatorMonitorIndicesFlag.Name + "=1",
|
||||
"--" + cmdshared.ValidatorMonitorIndicesFlag.Name + "=2",
|
||||
|
||||
Reference in New Issue
Block a user