Add Gossipsub Queue Flag (#13237)

* add it

* remove var

* fix tests

* terence's comments

---------

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Nishant Das
2023-11-30 14:22:59 +08:00
committed by GitHub
parent 56c1f9aab5
commit 1c35b66132
7 changed files with 26 additions and 6 deletions

View File

@@ -570,6 +570,7 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
TCPPort: cliCtx.Uint(cmd.P2PTCPPort.Name),
UDPPort: cliCtx.Uint(cmd.P2PUDPPort.Name),
MaxPeers: cliCtx.Uint(cmd.P2PMaxPeers.Name),
QueueSize: cliCtx.Uint(cmd.PubsubQueueSize.Name),
AllowListCIDR: cliCtx.String(cmd.P2PAllowList.Name),
DenyListCIDR: slice.SplitCommaSeparated(cliCtx.StringSlice(cmd.P2PDenyList.Name)),
EnableUPnP: cliCtx.Bool(cmd.EnableUPnPFlag.Name),

View File

@@ -6,6 +6,9 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
)
// This is the default queue size used if we have specified an invalid one.
const defaultPubsubQueueSize = 600
// Config for the p2p service. These parameters are set from application level flags
// to initialize the p2p service.
type Config struct {
@@ -25,9 +28,20 @@ type Config struct {
TCPPort uint
UDPPort uint
MaxPeers uint
QueueSize uint
AllowListCIDR string
DenyListCIDR []string
StateNotifier statefeed.Notifier
DB db.ReadOnlyDatabase
ClockWaiter startup.ClockWaiter
}
// validateConfig validates whether the values provided are accurate and will set
// the appropriate values for those that are invalid.
func validateConfig(cfg *Config) *Config {
if cfg.QueueSize == 0 {
log.Warnf("Invalid pubsub queue size of %d initialized, setting the quese size as %d instead", cfg.QueueSize, defaultPubsubQueueSize)
cfg.QueueSize = defaultPubsubQueueSize
}
return cfg
}

View File

@@ -139,9 +139,9 @@ func (s *Service) pubsubOptions() []pubsub.Option {
return MsgID(s.genesisValidatorsRoot, pmsg)
}),
pubsub.WithSubscriptionFilter(s),
pubsub.WithPeerOutboundQueueSize(pubsubQueueSize),
pubsub.WithPeerOutboundQueueSize(int(s.cfg.QueueSize)),
pubsub.WithMaxMessageSize(int(params.BeaconNetworkConfig().GossipMaxSizeBellatrix)),
pubsub.WithValidateQueueSize(pubsubQueueSize),
pubsub.WithValidateQueueSize(int(s.cfg.QueueSize)),
pubsub.WithPeerScore(peerScoringParams()),
pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute),
pubsub.WithGossipSubParams(pubsubGossipParam()),

View File

@@ -48,10 +48,6 @@ var refreshRate = slots.DivideSlotBy(2)
// maxBadResponses is the maximum number of bad responses from a peer before we stop talking to it.
const maxBadResponses = 5
// pubsubQueueSize is the size that we assign to our validation queue and outbound message queue for
// gossipsub.
const pubsubQueueSize = 600
// maxDialTimeout is the timeout for a single peer dial.
var maxDialTimeout = params.BeaconNetworkConfig().RespTimeout
@@ -98,6 +94,8 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
subnetsLock: make(map[uint64]*sync.RWMutex),
}
s.cfg = validateConfig(s.cfg)
dv5Nodes := parseBootStrapAddrs(s.cfg.BootstrapNodeAddr)
cfg.Discv5BootStrapAddr = dv5Nodes

View File

@@ -97,6 +97,7 @@ var appFlags = []cli.Flag{
cmd.P2PMetadata,
cmd.P2PAllowList,
cmd.P2PDenyList,
cmd.PubsubQueueSize,
cmd.DataDirFlag,
cmd.VerbosityFlag,
cmd.EnableTracingFlag,

View File

@@ -157,6 +157,7 @@ var appHelpFlagGroups = []flagGroup{
cmd.P2PMetadata,
cmd.P2PAllowList,
cmd.P2PDenyList,
cmd.PubsubQueueSize,
cmd.StaticPeers,
cmd.EnableUPnPFlag,
flags.MinSyncPeers,

View File

@@ -177,6 +177,11 @@ var (
"192.168.0.0/16 would deny connections from peers on your local network only. The " +
"default is to accept all connections.",
}
PubsubQueueSize = &cli.IntFlag{
Name: "pubsub-queue-size",
Usage: "The size of the pubsub validation and outbound queue for the node.",
Value: 600,
}
// ForceClearDB removes any previously stored data at the data directory.
ForceClearDB = &cli.BoolFlag{
Name: "force-clear-db",