Add flag for testing new p2p (#3243)

* refactor a bit to select p2p

* lint

* fix build

* fix build

* fix build

* fix build

* fix build
This commit is contained in:
Preston Van Loon
2019-08-19 17:20:56 -04:00
committed by GitHub
parent b7b62e24ad
commit 0b8cbd06b6
40 changed files with 250 additions and 100 deletions

View File

@@ -18,11 +18,11 @@ go_library(
"//beacon-chain/core/validators:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/operations:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/powchain:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/deprecated-p2p:go_default_library",
"//shared/event:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_pkg_errors//:go_default_library",
@@ -53,12 +53,12 @@ go_test(
"//beacon-chain/core/validators:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/internal:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/powchain:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library",
"//shared/bls:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/deprecated-p2p:go_default_library",
"//shared/event:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/params:go_default_library",

View File

@@ -17,10 +17,10 @@ import (
b "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/operations"
p2p "github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
p2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"

View File

@@ -17,9 +17,9 @@ import (
b "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/internal"
p2p "github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
p2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/sirupsen/logrus"
@@ -139,8 +139,9 @@ type mockBroadcaster struct {
broadcastCalled bool
}
func (mb *mockBroadcaster) Broadcast(_ context.Context, _ proto.Message) {
func (mb *mockBroadcaster) Broadcast(_ context.Context, _ proto.Message) error {
mb.broadcastCalled = true
return nil
}
var _ = p2p.Broadcaster(&mockBroadcaster{})

View File

@@ -9,6 +9,7 @@ go_library(
"regular_sync.go",
"service.go",
],
deprecation = "Use github.com/prysmaticlabs/prysm/sync",
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/deprecated-sync",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
@@ -17,6 +18,7 @@ go_library(
"//beacon-chain/db:go_default_library",
"//beacon-chain/deprecated-sync/initial-sync:go_default_library",
"//beacon-chain/operations:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library",
"//shared/bytesutil:go_default_library",

View File

@@ -9,6 +9,7 @@ go_library(
"sync_blocks.go",
"sync_state.go",
],
deprecation = "Use github.com/prysmaticlabs/prysm/sync/initial-sync",
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/deprecated-sync/initial-sync",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
@@ -16,6 +17,7 @@ go_library(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/validators:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library",
"//shared/bytesutil:go_default_library",

View File

@@ -23,10 +23,11 @@ import (
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
p2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p"
deprecatedp2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
@@ -67,8 +68,7 @@ func DefaultConfig() *Config {
type p2pAPI interface {
p2p.Sender
p2p.ReputationManager
p2p.Subscriber
p2p.DeprecatedSubscriber
}
type powChainService interface {
@@ -97,8 +97,8 @@ type InitialSync struct {
chainService chainService
db *db.BeaconDB
powchain powChainService
batchedBlockBuf chan p2p.Message
stateBuf chan p2p.Message
batchedBlockBuf chan deprecatedp2p.Message
stateBuf chan deprecatedp2p.Message
syncPollingInterval time.Duration
syncedFeed *event.Feed
stateReceived bool
@@ -113,8 +113,8 @@ func NewInitialSyncService(ctx context.Context,
) *InitialSync {
ctx, cancel := context.WithCancel(ctx)
stateBuf := make(chan p2p.Message, cfg.StateBufferSize)
batchedBlockBuf := make(chan p2p.Message, cfg.BatchedBlockBufferSize)
stateBuf := make(chan deprecatedp2p.Message, cfg.StateBufferSize)
batchedBlockBuf := make(chan deprecatedp2p.Message, cfg.BatchedBlockBufferSize)
return &InitialSync{
ctx: ctx,
@@ -290,13 +290,11 @@ func (s *InitialSync) syncToPeer(ctx context.Context, chainHeadResponse *pb.Chai
log.WithFields(fields).Info("Received batched blocks from peer")
if err := s.processBatchedBlocks(msg, chainHeadResponse); err != nil {
log.WithError(err).WithField("peer", peer).Error("Failed to sync with peer.")
s.p2p.Reputation(msg.Peer, p2p.RepPenalityInitialSyncFailure)
continue
}
if !s.nodeIsSynced {
return errors.New("node still not in sync after receiving batch blocks")
}
s.p2p.Reputation(msg.Peer, p2p.RepRewardValidBlock)
return nil
}
}

View File

@@ -50,7 +50,6 @@ func (s *InitialSync) processBatchedBlocks(msg p2p.Message, chainHead *pb.ChainH
batchedBlocks := response.BatchedBlocks
if len(batchedBlocks) == 0 {
// Do not process empty responses.
s.p2p.Reputation(msg.Peer, p2p.RepPenalityInitialSyncFailure)
return nil
}

View File

@@ -10,7 +10,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
p2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p"
deprecatedp2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
@@ -55,7 +55,7 @@ type Querier struct {
currentHeadSlot uint64
currentStateRoot []byte
currentFinalizedStateRoot [32]byte
responseBuf chan p2p.Message
responseBuf chan deprecatedp2p.Message
chainStartBuf chan time.Time
powchain powChainService
chainStarted bool
@@ -73,7 +73,7 @@ func NewQuerierService(ctx context.Context,
) *Querier {
ctx, cancel := context.WithCancel(ctx)
responseBuf := make(chan p2p.Message, cfg.ResponseBufferSize)
responseBuf := make(chan deprecatedp2p.Message, cfg.ResponseBufferSize)
return &Querier{
ctx: ctx,

View File

@@ -202,10 +202,8 @@ func (rs *RegularSync) validateAndProcessBlock(
if err := rs.chainService.ApplyForkChoiceRuleDeprecated(ctx, block, beaconState); err != nil {
log.WithError(err).Error("Could not run fork choice on block")
rs.p2p.Reputation(blockMsg.Peer, p2p.RepPenalityInvalidBlock)
return nil, nil, false, err
}
rs.p2p.Reputation(blockMsg.Peer, p2p.RepRewardValidBlock)
sentBlocks.Inc()
// We update the last observed slot to the received canonical block's slot.
if block.Slot > rs.highestObservedSlot {

View File

@@ -17,10 +17,11 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/operations"
p2p "github.com/prysmaticlabs/prysm/beacon-chain/p2p"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
p2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p"
deprecatedp2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/logutil"
@@ -51,8 +52,7 @@ type attsService interface {
type p2pAPI interface {
p2p.Broadcaster
p2p.Sender
p2p.Subscriber
p2p.ReputationManager
p2p.DeprecatedSubscriber
}
// RegularSync is the gateway and the bridge between the p2p network and the local beacon chain.
@@ -76,17 +76,17 @@ type RegularSync struct {
operationsService operations.OperationFeeds
db *db.BeaconDB
blockAnnouncementFeed *event.Feed
announceBlockBuf chan p2p.Message
blockBuf chan p2p.Message
blockRequestByHash chan p2p.Message
batchedRequestBuf chan p2p.Message
stateRequestBuf chan p2p.Message
chainHeadReqBuf chan p2p.Message
attestationBuf chan p2p.Message
exitBuf chan p2p.Message
announceBlockBuf chan deprecatedp2p.Message
blockBuf chan deprecatedp2p.Message
blockRequestByHash chan deprecatedp2p.Message
batchedRequestBuf chan deprecatedp2p.Message
stateRequestBuf chan deprecatedp2p.Message
chainHeadReqBuf chan deprecatedp2p.Message
attestationBuf chan deprecatedp2p.Message
exitBuf chan deprecatedp2p.Message
canonicalBuf chan *pb.BeaconBlockAnnounce
highestObservedSlot uint64
blocksAwaitingProcessing map[[32]byte]p2p.Message
blocksAwaitingProcessing map[[32]byte]deprecatedp2p.Message
blocksAwaitingProcessingLock sync.RWMutex
blockProcessingLock sync.RWMutex
blockAnnouncements map[uint64][]byte
@@ -138,16 +138,16 @@ func NewRegularSyncService(ctx context.Context, cfg *RegularSyncConfig) *Regular
operationsService: cfg.OperationService,
attsService: cfg.AttsService,
blockAnnouncementFeed: new(event.Feed),
announceBlockBuf: make(chan p2p.Message, cfg.BlockAnnounceBufferSize),
blockBuf: make(chan p2p.Message, cfg.BlockBufferSize),
blockRequestByHash: make(chan p2p.Message, cfg.BlockReqHashBufferSize),
batchedRequestBuf: make(chan p2p.Message, cfg.BatchedBufferSize),
stateRequestBuf: make(chan p2p.Message, cfg.StateReqBufferSize),
attestationBuf: make(chan p2p.Message, cfg.AttestationBufferSize),
exitBuf: make(chan p2p.Message, cfg.ExitBufferSize),
chainHeadReqBuf: make(chan p2p.Message, cfg.ChainHeadReqBufferSize),
announceBlockBuf: make(chan deprecatedp2p.Message, cfg.BlockAnnounceBufferSize),
blockBuf: make(chan deprecatedp2p.Message, cfg.BlockBufferSize),
blockRequestByHash: make(chan deprecatedp2p.Message, cfg.BlockReqHashBufferSize),
batchedRequestBuf: make(chan deprecatedp2p.Message, cfg.BatchedBufferSize),
stateRequestBuf: make(chan deprecatedp2p.Message, cfg.StateReqBufferSize),
attestationBuf: make(chan deprecatedp2p.Message, cfg.AttestationBufferSize),
exitBuf: make(chan deprecatedp2p.Message, cfg.ExitBufferSize),
chainHeadReqBuf: make(chan deprecatedp2p.Message, cfg.ChainHeadReqBufferSize),
canonicalBuf: make(chan *pb.BeaconBlockAnnounce, cfg.CanonicalBufferSize),
blocksAwaitingProcessing: make(map[[32]byte]p2p.Message),
blocksAwaitingProcessing: make(map[[32]byte]deprecatedp2p.Message),
blockAnnouncements: make(map[uint64][]byte),
}
}
@@ -228,7 +228,7 @@ func (rs *RegularSync) run() {
// safelyHandleMessage will recover and log any panic that occurs from the
// function argument.
func safelyHandleMessage(fn func(p2p.Message) error, msg p2p.Message) {
func safelyHandleMessage(fn func(deprecatedp2p.Message) error, msg deprecatedp2p.Message) {
defer func() {
if r := recover(); r != nil {
printedMsg := "message contains no data"
@@ -273,7 +273,7 @@ func safelyHandleMessage(fn func(p2p.Message) error, msg p2p.Message) {
}
}
func (rs *RegularSync) handleStateRequest(msg p2p.Message) error {
func (rs *RegularSync) handleStateRequest(msg deprecatedp2p.Message) error {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleStateRequest")
defer span.End()
stateReq.Inc()
@@ -322,7 +322,7 @@ func (rs *RegularSync) handleStateRequest(msg p2p.Message) error {
return nil
}
func (rs *RegularSync) handleChainHeadRequest(msg p2p.Message) error {
func (rs *RegularSync) handleChainHeadRequest(msg deprecatedp2p.Message) error {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleChainHeadRequest")
defer span.End()
chainHeadReq.Inc()
@@ -382,7 +382,7 @@ func (rs *RegularSync) handleChainHeadRequest(msg p2p.Message) error {
// receiveAttestation accepts an broadcasted attestation from the p2p layer,
// discard the attestation if we have gotten before, send it to attestation
// pool if we have not.
func (rs *RegularSync) receiveAttestation(msg p2p.Message) error {
func (rs *RegularSync) receiveAttestation(msg deprecatedp2p.Message) error {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.receiveAttestation")
defer span.End()
recAttestation.Inc()
@@ -438,7 +438,6 @@ func (rs *RegularSync) receiveAttestation(msg p2p.Message) error {
log.Debug("Sending newly received attestation to subscribers")
rs.operationsService.IncomingAttFeed().Send(attestation)
rs.attsService.IncomingAttestationFeed().Send(attestation)
rs.p2p.Reputation(msg.Peer, p2p.RepRewardValidAttestation)
sentAttestation.Inc()
sendAttestationSpan.End()
return nil
@@ -447,7 +446,7 @@ func (rs *RegularSync) receiveAttestation(msg p2p.Message) error {
// receiveExitRequest accepts an broadcasted exit from the p2p layer,
// discard the exit if we have gotten before, send it to operation
// service if we have not.
func (rs *RegularSync) receiveExitRequest(msg p2p.Message) error {
func (rs *RegularSync) receiveExitRequest(msg deprecatedp2p.Message) error {
_, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.receiveExitRequest")
defer span.End()
recExit.Inc()
@@ -472,7 +471,7 @@ func (rs *RegularSync) receiveExitRequest(msg p2p.Message) error {
return nil
}
func (rs *RegularSync) handleBlockRequestByHash(msg p2p.Message) error {
func (rs *RegularSync) handleBlockRequestByHash(msg deprecatedp2p.Message) error {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleBlockRequestByHash")
defer span.End()
blockReqHash.Inc()
@@ -500,7 +499,7 @@ func (rs *RegularSync) handleBlockRequestByHash(msg p2p.Message) error {
// handleBatchedBlockRequest receives p2p messages which consist of requests for batched blocks
// which are bounded by a start slot and end slot.
func (rs *RegularSync) handleBatchedBlockRequest(msg p2p.Message) error {
func (rs *RegularSync) handleBatchedBlockRequest(msg deprecatedp2p.Message) error {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleBatchedBlockRequest")
defer span.End()
batchedBlockReq.Inc()

View File

@@ -40,7 +40,8 @@ func (mp *mockP2P) Subscribe(msg proto.Message, channel chan p2p.Message) event.
return new(event.Feed).Subscribe(channel)
}
func (mp *mockP2P) Broadcast(ctx context.Context, msg proto.Message) {
func (mp *mockP2P) Broadcast(ctx context.Context, msg proto.Message) error {
return nil
}
func (mp *mockP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID) error {

View File

@@ -17,6 +17,7 @@ go_library(
"//beacon-chain/flags:go_default_library",
"//beacon-chain/gateway:go_default_library",
"//beacon-chain/operations:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/powchain:go_default_library",
"//beacon-chain/rpc:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",

View File

@@ -21,12 +21,13 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/gateway"
"github.com/prysmaticlabs/prysm/beacon-chain/operations"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
"github.com/prysmaticlabs/prysm/beacon-chain/rpc"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/cmd"
"github.com/prysmaticlabs/prysm/shared/debug"
p2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p"
deprecatedp2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/prometheus"
@@ -96,7 +97,7 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) {
return nil, err
}
if err := beacon.registerOperationService(); err != nil {
if err := beacon.registerOperationService(ctx); err != nil {
return nil, err
}
@@ -192,14 +193,49 @@ func (b *BeaconNode) startDB(ctx *cli.Context) error {
}
func (b *BeaconNode) registerP2P(ctx *cli.Context) error {
if featureconfig.FeatureConfig().UseNewP2P {
svc, err := p2p.NewService(&p2p.Config{
NoDiscovery: ctx.GlobalBool(cmd.NoDiscovery.Name),
StaticPeers: ctx.GlobalStringSlice(cmd.StaticPeers.Name),
BootstrapNodeAddr: ctx.GlobalString(cmd.BootstrapNode.Name),
RelayNodeAddr: ctx.GlobalString(cmd.RelayNode.Name),
HostAddress: ctx.GlobalString(cmd.P2PHost.Name),
PrivateKey: ctx.GlobalString(cmd.P2PPrivKey.Name),
Port: ctx.GlobalUint(cmd.P2PPort.Name),
MaxPeers: ctx.GlobalUint(cmd.P2PMaxPeers.Name),
WhitelistCIDR: ctx.GlobalString(cmd.P2PWhitelist.Name),
EnableUPnP: ctx.GlobalBool(cmd.EnableUPnPFlag.Name),
})
if err != nil {
return err
}
return b.services.RegisterService(svc)
}
beaconp2p, err := deprecatedConfigureP2P(ctx)
if err != nil {
return errors.Wrap(err, "could not register p2p service")
return errors.Wrap(err, "could not register deprecatedp2p service")
}
return b.services.RegisterService(beaconp2p)
}
func (b *BeaconNode) fetchP2P(ctx *cli.Context) p2p.P2P {
if featureconfig.FeatureConfig().UseNewP2P {
var p *p2p.Service
if err := b.services.FetchService(&p); err != nil {
panic(err)
}
return p
}
var p *deprecatedp2p.Server
if err := b.services.FetchService(&p); err != nil {
panic(err)
}
return p
}
func (b *BeaconNode) registerBlockchainService(ctx *cli.Context) error {
var web3Service *powchain.Web3Service
if err := b.services.FetchService(&web3Service); err != nil {
@@ -213,10 +249,6 @@ func (b *BeaconNode) registerBlockchainService(ctx *cli.Context) error {
if err := b.services.FetchService(&attsService); err != nil {
return err
}
var p2pService *p2p.Server
if err := b.services.FetchService(&p2pService); err != nil {
return err
}
maxRoutines := ctx.GlobalInt64(cmd.MaxGoroutines.Name)
blockchainService, err := blockchain.NewChainService(context.Background(), &blockchain.Config{
@@ -224,7 +256,7 @@ func (b *BeaconNode) registerBlockchainService(ctx *cli.Context) error {
Web3Service: web3Service,
OpsPoolService: opsService,
AttsService: attsService,
P2p: p2pService,
P2p: b.fetchP2P(ctx),
MaxRoutines: maxRoutines,
})
if err != nil {
@@ -233,15 +265,10 @@ func (b *BeaconNode) registerBlockchainService(ctx *cli.Context) error {
return b.services.RegisterService(blockchainService)
}
func (b *BeaconNode) registerOperationService() error {
var p2pService *p2p.Server
if err := b.services.FetchService(&p2pService); err != nil {
return err
}
func (b *BeaconNode) registerOperationService(ctx *cli.Context) error {
operationService := operations.NewOpsPoolService(context.Background(), &operations.Config{
BeaconDB: b.db,
P2P: p2pService,
P2P: b.fetchP2P(ctx),
})
return b.services.RegisterService(operationService)
@@ -302,17 +329,12 @@ func (b *BeaconNode) registerPOWChainService(cliCtx *cli.Context) error {
return b.services.RegisterService(web3Service)
}
func (b *BeaconNode) registerSyncService(_ *cli.Context) error {
func (b *BeaconNode) registerSyncService(ctx *cli.Context) error {
var chainService *blockchain.ChainService
if err := b.services.FetchService(&chainService); err != nil {
return err
}
var p2pService *p2p.Server
if err := b.services.FetchService(&p2pService); err != nil {
return err
}
var operationService *operations.Service
if err := b.services.FetchService(&operationService); err != nil {
return err
@@ -330,7 +352,7 @@ func (b *BeaconNode) registerSyncService(_ *cli.Context) error {
cfg := &rbcsync.Config{
ChainService: chainService,
P2P: p2pService,
P2P: b.fetchP2P(ctx),
BeaconDB: b.db,
OperationService: operationService,
PowChainService: web3Service,
@@ -347,11 +369,6 @@ func (b *BeaconNode) registerRPCService(ctx *cli.Context) error {
return err
}
var p2pService *p2p.Server
if err := b.services.FetchService(&p2pService); err != nil {
return err
}
var operationService *operations.Service
if err := b.services.FetchService(&operationService); err != nil {
return err
@@ -375,7 +392,7 @@ func (b *BeaconNode) registerRPCService(ctx *cli.Context) error {
CertFlag: cert,
KeyFlag: key,
BeaconDB: b.db,
Broadcaster: p2pService,
Broadcaster: b.fetchP2P(ctx),
ChainService: chainService,
OperationService: operationService,
POWChainService: web3Service,

View File

@@ -10,11 +10,11 @@ go_library(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/state:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library",
"//shared/bls:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/deprecated-p2p:go_default_library",
"//shared/event:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/messagehandler:go_default_library",

View File

@@ -15,11 +15,11 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
p2p "github.com/prysmaticlabs/prysm/beacon-chain/p2p"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
p2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/hashutil"
handler "github.com/prysmaticlabs/prysm/shared/messagehandler"

View File

@@ -31,7 +31,8 @@ var _ = Pool(&Service{})
type mockBroadcaster struct {
}
func (mb *mockBroadcaster) Broadcast(_ context.Context, _ proto.Message) {
func (mb *mockBroadcaster) Broadcast(_ context.Context, _ proto.Message) error {
return nil
}
func TestStop_OK(t *testing.T) {

View File

@@ -5,11 +5,13 @@ go_library(
srcs = [
"broadcaster.go",
"config.go",
"deprecated.go",
"doc.go",
"gossip_topic_mappings.go",
"handshake.go",
"interfaces.go",
"log.go",
"sender.go",
"service.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p",
@@ -19,6 +21,8 @@ go_library(
"//proto/beacon/p2p/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library",
"//shared:go_default_library",
"//shared/deprecated-p2p:go_default_library",
"//shared/event:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p//:go_default_library",
"@com_github_libp2p_go_libp2p_core//host:go_default_library",

View File

@@ -2,6 +2,7 @@ package p2p
import (
"bytes"
"context"
"reflect"
"github.com/gogo/protobuf/proto"
@@ -13,7 +14,7 @@ import (
var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic")
// Broadcast a message to the p2p network.
func (s *Service) Broadcast(msg proto.Message) error {
func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
topic, ok := GossipTypeMapping[reflect.TypeOf(msg)]
if !ok {
return ErrMessageNotMapped

View File

@@ -67,7 +67,7 @@ func TestService_Broadcast(t *testing.T) {
}()
// Broadcast to peers and wait.
if err := p.Broadcast(msg); err != nil {
if err := p.Broadcast(context.Background(), msg); err != nil {
t.Fatal(err)
}
if testutil.WaitTimeout(&wg, 1*time.Second) {
@@ -77,7 +77,7 @@ func TestService_Broadcast(t *testing.T) {
func TestService_Broadcast_ReturnsErr_TopicNotMapped(t *testing.T) {
p := Service{}
if err := p.Broadcast(&testpb.AddressBook{}); err != ErrMessageNotMapped {
if err := p.Broadcast(context.Background(), &testpb.AddressBook{}); err != ErrMessageNotMapped {
t.Fatalf("Expected error %v, got %v", ErrMessageNotMapped, err)
}
}

View File

@@ -0,0 +1,13 @@
package p2p
import (
"github.com/gogo/protobuf/proto"
deprecatedp2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p"
"github.com/prysmaticlabs/prysm/shared/event"
)
// DeprecatedSubscriber exists for backwards compatibility.
// DEPRECATED: Do not use. This exists for backwards compatibility but may be removed.
type DeprecatedSubscriber interface {
Subscribe(msg proto.Message, channel chan deprecatedp2p.Message) event.Subscription
}

View File

@@ -9,7 +9,10 @@ go_library(
"varint.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder",
visibility = ["//beacon-chain:__subpackages__"],
visibility = [
"//beacon-chain:__subpackages__",
"//shared/deprecated-p2p:__pkg__", # TODO(3147): Remove.
],
deps = [
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_golang_snappy//:go_default_library",

View File

@@ -16,3 +16,8 @@ func (p *Service) AddHandshake(pid peer.ID, hello *pb.Hello) {
defer handshakeLock.Unlock()
handshakes[pid] = hello
}
// Handshakes has not been implemented yet and it may be moved to regular sync...
func (p *Service) Handshakes() map[peer.ID]*pb.Hello {
return nil
}

View File

@@ -1,6 +1,8 @@
package p2p
import (
"context"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
@@ -17,11 +19,13 @@ type P2P interface {
PubSubProvider
PeerManager
HandshakeManager
Sender
DeprecatedSubscriber
}
// Broadcaster broadcasts messages to peers over the p2p pubsub protocol.
type Broadcaster interface {
Broadcast(proto.Message) error
Broadcast(context.Context, proto.Message) error
}
// SetStreamHandler configures p2p to handle streams of a certain topic ID.
@@ -48,3 +52,8 @@ type PeerManager interface {
type HandshakeManager interface {
AddHandshake(peer.ID, *pb.Hello)
}
// Sender abstracts the sending functionality from libp2p.
type Sender interface {
Send(context.Context, proto.Message, peer.ID) error
}

View File

@@ -0,0 +1,14 @@
package p2p
import (
"context"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/peer"
)
// Send a message to a specific peer.
// TODO(3147): Implement.
func (s *Service) Send(ctx context.Context, message proto.Message, pid peer.ID) error {
return nil
}

View File

@@ -3,6 +3,7 @@ package p2p
import (
"context"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
network "github.com/libp2p/go-libp2p-core/network"
@@ -12,6 +13,8 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/shared"
deprecatedp2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p"
"github.com/prysmaticlabs/prysm/shared/event"
)
var _ = shared.Service(&Service{})
@@ -102,3 +105,10 @@ func (s *Service) Disconnect(pid peer.ID) error {
// TODO(3147): Implement disconnect
return nil
}
// Subscribe to some topic.
// TODO(3147): Remove
// DEPRECATED: Do not use.
func (s *Service) Subscribe(_ proto.Message, _ chan deprecatedp2p.Message) event.Subscription {
return nil
}

View File

@@ -9,6 +9,8 @@ go_library(
deps = [
"//beacon-chain/p2p/encoder:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/deprecated-p2p:go_default_library",
"//shared/event:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p_blankhost//:go_default_library",
"@com_github_libp2p_go_libp2p_core//host:go_default_library",

View File

@@ -14,9 +14,10 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"
pubsub "github.com/libp2p/go-libp2p-pubsub"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
deprecatedp2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p"
"github.com/prysmaticlabs/prysm/shared/event"
)
// TestP2P represents a p2p implementation that can be used for testing.
@@ -108,7 +109,7 @@ func (p *TestP2P) ReceivePubSub(topic string, msg proto.Message) {
}
// Broadcast a message.
func (p *TestP2P) Broadcast(msg proto.Message) error {
func (p *TestP2P) Broadcast(ctx context.Context, msg proto.Message) error {
p.BroadcastCalled = true
return nil
}
@@ -138,3 +139,15 @@ func (p *TestP2P) Disconnect(pid peer.ID) error {
func (p *TestP2P) AddHandshake(pid peer.ID, hello *pb.Hello) {
// TODO(3147): add this.
}
// Send a message to a specific peer.
func (p *TestP2P) Send(ctx context.Context, msg proto.Message, pid peer.ID) error {
// TODO(3147): add this.
return nil
}
// Subscribe to some topic. Not implemented.
func (p *TestP2P) Subscribe(msg proto.Message, ch chan deprecatedp2p.Message) event.Subscription {
// TODO(3147): remove this.
return nil
}

View File

@@ -25,11 +25,11 @@ go_library(
"//beacon-chain/db:go_default_library",
"//beacon-chain/deprecated-sync:go_default_library",
"//beacon-chain/operations:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/beacon/rpc/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/deprecated-p2p:go_default_library",
"//shared/event:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/pagination:go_default_library",

View File

@@ -10,11 +10,11 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
p2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
)

View File

@@ -19,7 +19,8 @@ import (
type mockBroadcaster struct{}
func (m *mockBroadcaster) Broadcast(ctx context.Context, msg proto.Message) {
func (m *mockBroadcaster) Broadcast(ctx context.Context, msg proto.Message) error {
return nil
}
func TestSubmitAttestation_OK(t *testing.T) {

View File

@@ -18,10 +18,10 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/db"
sync "github.com/prysmaticlabs/prysm/beacon-chain/deprecated-sync"
"github.com/prysmaticlabs/prysm/beacon-chain/operations"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
p2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/trieutil"

View File

@@ -38,7 +38,7 @@ func (r *RegularSync) validateVoluntaryExit(ctx context.Context, msg proto.Messa
}
seenExits.Set(exitCacheKey(exit), true /*value*/, 365*24*time.Hour /*TTL*/)
if err := p.Broadcast(exit); err != nil {
if err := p.Broadcast(ctx, exit); err != nil {
log.WithError(err).Error("Failed to propagate voluntary exit")
}
return true

View File

@@ -13,13 +13,16 @@ go_library(
"message.go",
"monitoring.go",
"negotiation.go",
"noop_p2p_compatibility.go",
"options.go",
"p2p.go",
"service.go",
],
deprecation = "Use github.com/prysmaticlabs/prysm/p2p",
importpath = "github.com/prysmaticlabs/prysm/shared/deprecated-p2p",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/p2p/encoder:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/event:go_default_library",
"//shared/featureconfig:go_default_library",
@@ -36,6 +39,8 @@ go_library(
"@com_github_libp2p_go_libp2p//p2p/discovery:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/host/routed:go_default_library",
"@com_github_libp2p_go_libp2p_connmgr//:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_libp2p_go_libp2p_crypto//:go_default_library",
"@com_github_libp2p_go_libp2p_host//:go_default_library",
"@com_github_libp2p_go_libp2p_kad_dht//:go_default_library",

View File

@@ -42,8 +42,9 @@ func (s *Server) Reputation(peer peer.ID, val int) {
}
// Disconnect will close all connections to the given peer.
func (s *Server) Disconnect(peer peer.ID) {
func (s *Server) Disconnect(peer peer.ID) error {
if err := s.host.Network().ClosePeer(peer); err != nil {
log.WithError(err).WithField("peer", peer.Pretty()).Error("Failed to close conn with peer")
}
return nil
}

View File

@@ -12,7 +12,7 @@ import (
// for testing or when the calling code only needs access to the broadcast
// method.
type Broadcaster interface {
Broadcast(context.Context, proto.Message)
Broadcast(context.Context, proto.Message) error
}
// Subscriber represents a subset of the p2p.Server. This interface is useful

View File

@@ -9,6 +9,7 @@ import (
)
// Message represents a message received from an external peer.
// DEPRECATED: Do not use. This exists for backwards compatibility but may be removed.
type Message struct {
// Ctx message context.
Ctx context.Context

View File

@@ -0,0 +1,37 @@
package p2p
import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
ethpb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)
// This file exists to enable interop/compatibility between this deprecated library and the new
// p2p library. See issue #3147.
// AddHandshake not implemented.
func (s *Server) AddHandshake(_ peer.ID, _ *ethpb.Hello) {
panic("not implemented")
}
// Handshakes not implemented.
func (s *Server) Handshakes() map[peer.ID]*ethpb.Hello {
return nil
}
// Encoding not implemented.
func (s *Server) Encoding() encoder.NetworkEncoding {
return nil
}
// PubSub not implemented.
func (s *Server) PubSub() *pubsub.PubSub {
return s.gsub
}
// SetStreamHandler not implemented.
func (s *Server) SetStreamHandler(_ string, _ network.StreamHandler) {
panic("not implemented")
}

View File

@@ -477,7 +477,7 @@ func (s *Server) Send(ctx context.Context, msg proto.Message, peerID peer.ID) er
// msg := make(chan p2p.Message, 100) // Choose a reasonable buffer size!
// ps.RegisterTopic("message_topic_here", msg)
// ps.Broadcast(msg)
func (s *Server) Broadcast(ctx context.Context, msg proto.Message) {
func (s *Server) Broadcast(ctx context.Context, msg proto.Message) error {
defer func() {
if r := recover(); r != nil {
log.WithField("r", r).Error("Panicked when broadcasting!")
@@ -510,13 +510,13 @@ func (s *Server) Broadcast(ctx context.Context, msg proto.Message) {
m, ok := msg.(proto.Message)
if !ok {
log.Errorf("Message to broadcast (type: %T) is not a protobuf message: %v", msg, msg)
return
return nil
}
b, err := proto.Marshal(m)
if err != nil {
log.Errorf("Failed to marshal data for broadcast: %v", err)
return
return err
}
envelope := &pb.Envelope{
@@ -528,10 +528,11 @@ func (s *Server) Broadcast(ctx context.Context, msg proto.Message) {
data, err := proto.Marshal(envelope)
if err != nil {
log.Errorf("Failed to marshal data for broadcast: %v", err)
return
return err
}
if err := s.gsub.Publish(topic, data); err != nil {
log.Errorf("Failed to publish to gossipsub topic: %v", err)
}
return nil
}

View File

@@ -29,6 +29,7 @@ type FeatureFlagConfig struct {
DisableGossipSub bool // DisableGossipSub in p2p messaging.
EnableExcessDeposits bool // EnableExcessDeposits in validator balances.
NoGenesisDelay bool // NoGenesisDelay when processing a chain start genesis event.
UseNewP2P bool // UseNewP2P service.
// Cache toggles.
EnableActiveBalanceCache bool // EnableActiveBalanceCache; see https://github.com/prysmaticlabs/prysm/issues/3106.
@@ -71,6 +72,10 @@ func ConfigureBeaconFeatures(ctx *cli.Context) {
log.Warn("Using non standard genesis delay. This may cause problems in a multi-node environment.")
cfg.NoGenesisDelay = true
}
if ctx.GlobalBool(UseNewP2PFlag.Name) {
log.Warn("Using new P2P service.")
cfg.UseNewP2P = true
}
if ctx.GlobalBool(EnableActiveBalanceCacheFlag.Name) {
log.Warn("Enabled unsafe active balance cache")
cfg.EnableActiveBalanceCache = true

View File

@@ -31,6 +31,11 @@ var (
Name: "no-genesis-delay",
Usage: "Process genesis event 30s after the ETH1 block time, rather than wait to midnight of the next day.",
}
// UseNewP2PFlag to start the beacon chain with the new p2p library.
UseNewP2PFlag = cli.BoolFlag{
Name: "experimental-p2p",
Usage: "Use the new experimental p2p library. See issue #3147.",
}
// EnableActiveBalanceCacheFlag see https://github.com/prysmaticlabs/prysm/issues/3106.
EnableActiveBalanceCacheFlag = cli.BoolFlag{
Name: "enable-active-balance-cache",
@@ -78,6 +83,7 @@ var BeaconChainFlags = []cli.Flag{
DisableGossipSubFlag,
EnableExcessDepositsFlag,
NoGenesisDelayFlag,
UseNewP2PFlag,
EnableActiveBalanceCacheFlag,
EnableAttestationCacheFlag,
EnableAncestorBlockCacheFlag,