PeerDAS: add data column batch config (#14122)

This commit is contained in:
Francis Li
2024-06-21 04:28:34 -07:00
committed by Manu NALEPA
parent 6eb56a9aa1
commit 5cf08b4c0e
7 changed files with 53 additions and 20 deletions

View File

@@ -7,13 +7,14 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/trailofbits/go-mutexasserts"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
leakybucket "github.com/prysmaticlabs/prysm/v5/container/leaky-bucket"
"github.com/sirupsen/logrus"
"github.com/trailofbits/go-mutexasserts"
)
const defaultBurstLimit = 5
@@ -43,9 +44,13 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {
allowedBlocksPerSecond := float64(flags.Get().BlockBatchLimit)
allowedBlocksBurst := int64(flags.Get().BlockBatchLimitBurstFactor * flags.Get().BlockBatchLimit)
// Initialize blob limits.
allowedBlobsPerSecond := float64(flags.Get().BlobBatchLimit)
allowedBlobsBurst := int64(flags.Get().BlobBatchLimitBurstFactor * flags.Get().BlobBatchLimit)
// Initialize data column limits.
allowedDataColumnsPerSecond := float64(flags.Get().BlobBatchLimit * int(params.BeaconConfig().CustodyRequirement))
allowedDataColumnsBurst := int64(flags.Get().BlobBatchLimitBurstFactor * flags.Get().BlobBatchLimit * int(params.BeaconConfig().CustodyRequirement))
allowedDataColumnsPerSecond := float64(flags.Get().DataColumnBatchLimit * int(params.BeaconConfig().CustodyRequirement))
allowedDataColumnsBurst := int64(flags.Get().DataColumnBatchLimitBurstFactor * flags.Get().BlobBatchLimit * int(params.BeaconConfig().CustodyRequirement))
// Set topic map for all rpc topics.
topicMap := make(map[string]*leakybucket.Collector, len(p2p.RPCTopicMappings))
@@ -65,7 +70,8 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {
blockCollectorV2 := leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, blockBucketPeriod, false /* deleteEmptyBuckets */)
// for BlobSidecarsByRoot and BlobSidecarsByRange
blobCollector := leakybucket.NewCollector(allowedDataColumnsPerSecond, allowedDataColumnsBurst, blockBucketPeriod, false)
blobCollector := leakybucket.NewCollector(allowedBlobsPerSecond, allowedBlobsBurst, blockBucketPeriod, false)
// for DataColumnSidecarsByRoot and DataColumnSidecarsByRange
columnCollector := leakybucket.NewCollector(allowedDataColumnsPerSecond, allowedDataColumnsBurst, blockBucketPeriod, false)

View File

@@ -58,8 +58,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
requestedColumnsList = append(requestedColumnsList, ident.ColumnIndex)
}
// TODO: Customize data column batches too
batchSize := flags.Get().BlobBatchLimit
batchSize := flags.Get().DataColumnBatchLimit
var ticker *time.Ticker
if len(requestedColumnIdents) > batchSize {
ticker = time.NewTicker(time.Second)

View File

@@ -3,9 +3,10 @@
package flags
import (
"github.com/urfave/cli/v2"
"github.com/prysmaticlabs/prysm/v5/cmd"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/urfave/cli/v2"
)
var (
@@ -191,6 +192,19 @@ var (
Usage: "The factor by which blob batch limit may increase on burst.",
Value: 2,
}
// DataColumnBatchLimit specifies the requested data column batch size.
DataColumnBatchLimit = &cli.IntFlag{
Name: "data-column-batch-limit",
Usage: "The amount of data columns the local peer is bounded to request and respond to in a batch.",
// TODO: determine a good default value for this flag.
Value: 128,
}
// DataColumnBatchLimitBurstFactor specifies the factor by which data column batch size may increase.
DataColumnBatchLimitBurstFactor = &cli.IntFlag{
Name: "data-column-batch-limit-burst-factor",
Usage: "The factor by which data column batch limit may increase on burst.",
Value: 2,
}
// DisableDebugRPCEndpoints disables the debug Beacon API namespace.
DisableDebugRPCEndpoints = &cli.BoolFlag{
Name: "disable-debug-rpc-endpoints",

View File

@@ -1,21 +1,24 @@
package flags
import (
"github.com/prysmaticlabs/prysm/v5/cmd"
"github.com/urfave/cli/v2"
"github.com/prysmaticlabs/prysm/v5/cmd"
)
// GlobalFlags specifies all the global flags for the
// beacon node.
type GlobalFlags struct {
SubscribeToAllSubnets bool
MinimumSyncPeers int
MinimumPeersPerSubnet int
MaxConcurrentDials int
BlockBatchLimit int
BlockBatchLimitBurstFactor int
BlobBatchLimit int
BlobBatchLimitBurstFactor int
SubscribeToAllSubnets bool
MinimumSyncPeers int
MinimumPeersPerSubnet int
MaxConcurrentDials int
BlockBatchLimit int
BlockBatchLimitBurstFactor int
BlobBatchLimit int
BlobBatchLimitBurstFactor int
DataColumnBatchLimit int
DataColumnBatchLimitBurstFactor int
}
var globalConfig *GlobalFlags
@@ -45,6 +48,8 @@ func ConfigureGlobalFlags(ctx *cli.Context) {
cfg.BlockBatchLimitBurstFactor = ctx.Int(BlockBatchLimitBurstFactor.Name)
cfg.BlobBatchLimit = ctx.Int(BlobBatchLimit.Name)
cfg.BlobBatchLimitBurstFactor = ctx.Int(BlobBatchLimitBurstFactor.Name)
cfg.DataColumnBatchLimit = ctx.Int(DataColumnBatchLimit.Name)
cfg.DataColumnBatchLimitBurstFactor = ctx.Int(DataColumnBatchLimitBurstFactor.Name)
cfg.MinimumPeersPerSubnet = ctx.Int(MinPeersPerSubnet.Name)
cfg.MaxConcurrentDials = ctx.Int(MaxConcurrentDials.Name)
configureMinimumPeers(ctx, cfg)

View File

@@ -12,6 +12,9 @@ import (
golog "github.com/ipfs/go-log/v2"
joonix "github.com/joonix/log"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/builder"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/node"
"github.com/prysmaticlabs/prysm/v5/cmd"
@@ -35,8 +38,6 @@ import (
_ "github.com/prysmaticlabs/prysm/v5/runtime/maxprocs"
"github.com/prysmaticlabs/prysm/v5/runtime/tos"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)
var appFlags = []cli.Flag{
@@ -60,6 +61,8 @@ var appFlags = []cli.Flag{
flags.BlockBatchLimitBurstFactor,
flags.BlobBatchLimit,
flags.BlobBatchLimitBurstFactor,
flags.DataColumnBatchLimit,
flags.DataColumnBatchLimitBurstFactor,
flags.InteropMockEth1DataVotesFlag,
flags.InteropNumValidatorsFlag,
flags.InteropGenesisTimeFlag,

View File

@@ -5,6 +5,8 @@ import (
"io"
"sort"
"github.com/urfave/cli/v2"
"github.com/prysmaticlabs/prysm/v5/cmd"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/storage"
@@ -13,7 +15,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/sync/genesis"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/runtime/debug"
"github.com/urfave/cli/v2"
)
var appHelpTemplate = `NAME:
@@ -116,6 +117,8 @@ var appHelpFlagGroups = []flagGroup{
flags.BlockBatchLimitBurstFactor,
flags.BlobBatchLimit,
flags.BlobBatchLimitBurstFactor,
flags.DataColumnBatchLimit,
flags.DataColumnBatchLimitBurstFactor,
flags.DisableDebugRPCEndpoints,
flags.SubscribeToAllSubnets,
flags.HistoricalSlasherNode,

View File

@@ -14,6 +14,7 @@ import (
"github.com/bazelbuild/rules_go/go/tools/bazel"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
cmdshared "github.com/prysmaticlabs/prysm/v5/cmd"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
@@ -270,6 +271,8 @@ func (node *BeaconNode) Start(ctx context.Context) error {
fmt.Sprintf("--%s=%d", flags.BlockBatchLimitBurstFactor.Name, 8),
fmt.Sprintf("--%s=%d", flags.BlobBatchLimitBurstFactor.Name, 16),
fmt.Sprintf("--%s=%d", flags.BlobBatchLimit.Name, 256),
fmt.Sprintf("--%s=%d", flags.DataColumnBatchLimit.Name, 128),
fmt.Sprintf("--%s=%d", flags.DataColumnBatchLimitBurstFactor.Name, 2),
fmt.Sprintf("--%s=%s", cmdshared.ChainConfigFileFlag.Name, cfgPath),
"--" + cmdshared.ValidatorMonitorIndicesFlag.Name + "=1",
"--" + cmdshared.ValidatorMonitorIndicesFlag.Name + "=2",