Configurable pubsub router (#5499)

* A p2p flag for configuring pubsub router
* Add randomsub
* lint
* Merge refs/heads/master into configurable-pubsub-router
* Default to gossip
* Merge branch 'configurable-pubsub-router' of github.com:prysmaticlabs/prysm into configurable-pubsub-router
* Merge refs/heads/master into configurable-pubsub-router
* Add flag to usage
* Merge branch 'configurable-pubsub-router' of github.com:prysmaticlabs/prysm into configurable-pubsub-router
* Fix build
* Merge refs/heads/master into configurable-pubsub-router
This commit is contained in:
Preston Van Loon
2020-04-18 18:53:32 -07:00
committed by GitHub
parent 62fa6ed2e9
commit 0ed0cb58f8
6 changed files with 31 additions and 1 deletions

View File

@@ -63,6 +63,7 @@ var appFlags = []cli.Flag{
cmd.P2PMetadata,
cmd.P2PWhitelist,
cmd.P2PEncoding,
cmd.P2PPubsub,
cmd.DataDirFlag,
cmd.VerbosityFlag,
cmd.EnableTracingFlag,

View File

@@ -312,6 +312,7 @@ func (b *BeaconNode) registerP2P(ctx *cli.Context) error {
DisableDiscv5: ctx.Bool(flags.DisableDiscv5.Name),
Encoding: ctx.String(cmd.P2PEncoding.Name),
StateNotifier: b,
PubSub: ctx.String(cmd.P2PPubsub.Name),
})
if err != nil {
return err

View File

@@ -27,4 +27,5 @@ type Config struct {
WhitelistCIDR string
Encoding string
StateNotifier statefeed.Notifier
PubSub string
}

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/ecdsa"
"fmt"
"strconv"
"strings"
"time"
@@ -56,6 +57,12 @@ const prysmProtocolPrefix = "/prysm/0.0.0"
// maxBadResponses is the maximum number of bad responses from a peer before we stop talking to it.
const maxBadResponses = 3
const (
pubsubFlood = "flood"
pubsubGossip = "gossip"
pubsubRandom = "random"
)
// Service for managing peer to peer (p2p) networking.
type Service struct {
started bool
@@ -153,7 +160,20 @@ func NewService(cfg *Config) (*Service, error) {
pubsub.WithStrictSignatureVerification(false),
pubsub.WithMessageIdFn(msgIDFunction),
}
gs, err := pubsub.NewGossipSub(s.ctx, s.host, psOpts...)
var gs *pubsub.PubSub
if cfg.PubSub == "" {
cfg.PubSub = pubsubGossip
}
if cfg.PubSub == pubsubFlood {
gs, err = pubsub.NewFloodSub(s.ctx, s.host, psOpts...)
} else if cfg.PubSub == pubsubGossip {
gs, err = pubsub.NewGossipSub(s.ctx, s.host, psOpts...)
} else if cfg.PubSub == pubsubRandom {
gs, err = pubsub.NewRandomSub(s.ctx, s.host, psOpts...)
} else {
return nil, fmt.Errorf("unknown pubsub type %s", cfg.PubSub)
}
if err != nil {
log.WithError(err).Error("Failed to start pubsub")
return nil, err

View File

@@ -108,6 +108,7 @@ var appHelpFlagGroups = []flagGroup{
cmd.StaticPeers,
cmd.EnableUPnPFlag,
cmd.P2PEncoding,
cmd.P2PPubsub,
flags.MinSyncPeers,
},
},

View File

@@ -137,6 +137,12 @@ var (
Usage: "The encoding format of messages sent over the wire. The default is 0, which represents ssz",
Value: "ssz-snappy",
}
// P2PPubsub defines the pubsub router to use for p2p messages.
P2PPubsub = &cli.StringFlag{
Name: "p2p-pubsub",
Usage: "The name of the pubsub router to use. Supported values are: gossip, flood, random",
Value: "gossip",
}
// ForceClearDB removes any previously stored data at the data directory.
ForceClearDB = &cli.BoolFlag{
Name: "force-clear-db",