diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 319486c886..cdc427fcfe 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -570,6 +570,7 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error { TCPPort: cliCtx.Uint(cmd.P2PTCPPort.Name), UDPPort: cliCtx.Uint(cmd.P2PUDPPort.Name), MaxPeers: cliCtx.Uint(cmd.P2PMaxPeers.Name), + QueueSize: cliCtx.Uint(cmd.PubsubQueueSize.Name), AllowListCIDR: cliCtx.String(cmd.P2PAllowList.Name), DenyListCIDR: slice.SplitCommaSeparated(cliCtx.StringSlice(cmd.P2PDenyList.Name)), EnableUPnP: cliCtx.Bool(cmd.EnableUPnPFlag.Name), diff --git a/beacon-chain/p2p/config.go b/beacon-chain/p2p/config.go index bb18dd8da6..8f6c0f8c3f 100644 --- a/beacon-chain/p2p/config.go +++ b/beacon-chain/p2p/config.go @@ -6,6 +6,9 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" ) +// This is the default queue size used if we have specified an invalid one. +const defaultPubsubQueueSize = 600 + // Config for the p2p service. These parameters are set from application level flags // to initialize the p2p service. type Config struct { @@ -25,9 +28,20 @@ type Config struct { TCPPort uint UDPPort uint MaxPeers uint + QueueSize uint AllowListCIDR string DenyListCIDR []string StateNotifier statefeed.Notifier DB db.ReadOnlyDatabase ClockWaiter startup.ClockWaiter } + +// validateConfig validates whether the values provided are accurate and will set +// the appropriate values for those that are invalid. +func validateConfig(cfg *Config) *Config { + if cfg.QueueSize == 0 { + log.Warnf("Invalid pubsub queue size of %d initialized, setting the quese size as %d instead", cfg.QueueSize, defaultPubsubQueueSize) + cfg.QueueSize = defaultPubsubQueueSize + } + return cfg +} diff --git a/beacon-chain/p2p/pubsub.go b/beacon-chain/p2p/pubsub.go index 47d649db05..6ba53d8f0f 100644 --- a/beacon-chain/p2p/pubsub.go +++ b/beacon-chain/p2p/pubsub.go @@ -139,9 +139,9 @@ func (s *Service) pubsubOptions() []pubsub.Option { return MsgID(s.genesisValidatorsRoot, pmsg) }), pubsub.WithSubscriptionFilter(s), - pubsub.WithPeerOutboundQueueSize(pubsubQueueSize), + pubsub.WithPeerOutboundQueueSize(int(s.cfg.QueueSize)), pubsub.WithMaxMessageSize(int(params.BeaconNetworkConfig().GossipMaxSizeBellatrix)), - pubsub.WithValidateQueueSize(pubsubQueueSize), + pubsub.WithValidateQueueSize(int(s.cfg.QueueSize)), pubsub.WithPeerScore(peerScoringParams()), pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute), pubsub.WithGossipSubParams(pubsubGossipParam()), diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 2b8aa3e7f2..a3b80ca116 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -48,10 +48,6 @@ var refreshRate = slots.DivideSlotBy(2) // maxBadResponses is the maximum number of bad responses from a peer before we stop talking to it. const maxBadResponses = 5 -// pubsubQueueSize is the size that we assign to our validation queue and outbound message queue for -// gossipsub. -const pubsubQueueSize = 600 - // maxDialTimeout is the timeout for a single peer dial. var maxDialTimeout = params.BeaconNetworkConfig().RespTimeout @@ -98,6 +94,8 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) { subnetsLock: make(map[uint64]*sync.RWMutex), } + s.cfg = validateConfig(s.cfg) + dv5Nodes := parseBootStrapAddrs(s.cfg.BootstrapNodeAddr) cfg.Discv5BootStrapAddr = dv5Nodes diff --git a/cmd/beacon-chain/main.go b/cmd/beacon-chain/main.go index 8c70cbd9cd..29826f7519 100644 --- a/cmd/beacon-chain/main.go +++ b/cmd/beacon-chain/main.go @@ -97,6 +97,7 @@ var appFlags = []cli.Flag{ cmd.P2PMetadata, cmd.P2PAllowList, cmd.P2PDenyList, + cmd.PubsubQueueSize, cmd.DataDirFlag, cmd.VerbosityFlag, cmd.EnableTracingFlag, diff --git a/cmd/beacon-chain/usage.go b/cmd/beacon-chain/usage.go index cad914bf37..ab7835af59 100644 --- a/cmd/beacon-chain/usage.go +++ b/cmd/beacon-chain/usage.go @@ -157,6 +157,7 @@ var appHelpFlagGroups = []flagGroup{ cmd.P2PMetadata, cmd.P2PAllowList, cmd.P2PDenyList, + cmd.PubsubQueueSize, cmd.StaticPeers, cmd.EnableUPnPFlag, flags.MinSyncPeers, diff --git a/cmd/flags.go b/cmd/flags.go index b2729d2a77..081c937eec 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -177,6 +177,11 @@ var ( "192.168.0.0/16 would deny connections from peers on your local network only. The " + "default is to accept all connections.", } + PubsubQueueSize = &cli.IntFlag{ + Name: "pubsub-queue-size", + Usage: "The size of the pubsub validation and outbound queue for the node.", + Value: 600, + } // ForceClearDB removes any previously stored data at the data directory. ForceClearDB = &cli.BoolFlag{ Name: "force-clear-db",