Use New Blockchain Service in RPC Package (#3286)

* new chain service usage via interface

* put in the new chain service in propose blk

* deprecate with new service for canonical block roots

* remove old chain serv absolutely in validator server

* full legacy code compatible in beacon server

* fully compliant

* full deprecation at service level

* no more mock chain serv

* fix beacon server tests

* add changes to prop server

* broken build

* --next compatible

* conditional register of chain service

* proper conversion

* nil deref
This commit is contained in:
Raul Jordan
2019-08-23 13:53:07 -05:00
committed by GitHub
parent 02ca2290e1
commit 2e8a06d6d4
10 changed files with 62 additions and 121 deletions

View File

@@ -416,9 +416,19 @@ func (b *BeaconNode) registerSyncService(ctx *cli.Context) error {
}
func (b *BeaconNode) registerRPCService(ctx *cli.Context) error {
var chainService *dblockchain.ChainService
if err := b.services.FetchService(&chainService); err != nil {
return err
var chainService interface{}
if featureconfig.FeatureConfig().UseNewBlockChainService {
var newChain *blockchain.ChainService
if err := b.services.FetchService(&newChain); err != nil {
return err
}
chainService = newChain
} else {
var deprecatedChain *dblockchain.ChainService
if err := b.services.FetchService(&deprecatedChain); err != nil {
return err
}
chainService = deprecatedChain
}
var operationService *operations.Service

View File

@@ -14,6 +14,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/rpc",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/cache/depositcache:go_default_library",
"//beacon-chain/core/blocks:go_default_library",

View File

@@ -9,15 +9,19 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
blockchain "github.com/prysmaticlabs/prysm/beacon-chain/deprecated-blockchain"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/trieutil"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type stateFeedListener interface {
StateInitializedFeed() *event.Feed
}
// BeaconServer defines a server implementation of the gRPC Beacon service,
// providing RPC endpoints for obtaining the canonical beacon chain head,
// fetching latest observed attestations, and more.
@@ -25,8 +29,7 @@ type BeaconServer struct {
beaconDB db.Database
ctx context.Context
powChainService powChainService
chainService chainService
targetsFetcher blockchain.TargetsFetcher
chainService stateFeedListener
operationService operationService
incomingAttestation chan *ethpb.Attestation
canonicalStateChan chan *pbp2p.BeaconState

View File

@@ -171,6 +171,12 @@ func (m *mockPOWChainService) ChainStartETH1Data() *ethpb.Eth1Data {
return m.eth1Data
}
type mockStateFeedListener struct{}
func (m *mockStateFeedListener) StateInitializedFeed() *event.Feed {
return new(event.Feed)
}
func TestWaitForChainStart_ContextClosed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
beaconServer := &BeaconServer{
@@ -178,7 +184,7 @@ func TestWaitForChainStart_ContextClosed(t *testing.T) {
powChainService: &faultyPOWChainService{
chainStartFeed: new(event.Feed),
},
chainService: newMockChainService(),
chainService: &mockStateFeedListener{},
}
exitRoutine := make(chan bool)
ctrl := gomock.NewController(t)
@@ -200,7 +206,7 @@ func TestWaitForChainStart_AlreadyStarted(t *testing.T) {
powChainService: &mockPOWChainService{
chainStartFeed: new(event.Feed),
},
chainService: newMockChainService(),
chainService: &mockStateFeedListener{},
}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@@ -224,7 +230,7 @@ func TestWaitForChainStart_NotStartedThenLogFired(t *testing.T) {
powChainService: &faultyPOWChainService{
chainStartFeed: new(event.Feed),
},
chainService: newMockChainService(),
chainService: &mockStateFeedListener{},
}
exitRoutine := make(chan bool)
ctrl := gomock.NewController(t)
@@ -453,8 +459,7 @@ func TestBlockTree_OK(t *testing.T) {
}
bs := &BeaconServer{
beaconDB: db,
targetsFetcher: &mockChainService{targets: attestationTargets},
beaconDB: db,
}
sort.Slice(tree, func(i, j int) bool {
return string(tree[i].Block.StateRoot) < string(tree[j].Block.StateRoot)
@@ -663,8 +668,7 @@ func TestBlockTreeBySlots_ArgsValildation(t *testing.T) {
t.Fatal(err)
}
bs := &BeaconServer{
beaconDB: db,
targetsFetcher: &mockChainService{targets: attestationTargets},
beaconDB: db,
}
if _, err := bs.BlockTreeBySlots(ctx, nil); err == nil {
// There should be a "argument 'TreeBlockSlotRequest' cannot be nil" error
@@ -881,8 +885,7 @@ func TestBlockTreeBySlots_OK(t *testing.T) {
}
bs := &BeaconServer{
beaconDB: db,
targetsFetcher: &mockChainService{targets: attestationTargets},
beaconDB: db,
}
slotRange := &pb.TreeBlockSlotRequest{
SlotFrom: 3,

View File

@@ -7,12 +7,14 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-ssz"
newBlockchain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
blockchain "github.com/prysmaticlabs/prysm/beacon-chain/deprecated-blockchain"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
@@ -27,7 +29,7 @@ import (
// beacon blocks to a beacon node, and more.
type ProposerServer struct {
beaconDB db.Database
chainService chainService
chainService interface{}
powChainService powChainService
operationService operationService
canonicalStateChan chan *pbp2p.BeaconState
@@ -120,25 +122,22 @@ func (ps *ProposerServer) ProposeBlock(ctx context.Context, blk *ethpb.BeaconBlo
log.WithField("blockRoot", fmt.Sprintf("%#x", bytesutil.Trunc(root[:]))).Debugf(
"Block proposal received via RPC")
beaconState, err := ps.chainService.ReceiveBlockDeprecated(ctx, blk)
if err != nil {
return nil, errors.Wrap(err, "could not process beacon block")
}
if d, isLegacyDB := ps.beaconDB.(*db.BeaconDB); isLegacyDB {
if err := d.UpdateChainHead(ctx, blk, beaconState); err != nil {
db, isLegacyDB := ps.beaconDB.(*db.BeaconDB)
if srv, isLegacyService := ps.chainService.(*blockchain.ChainService); isLegacyService && isLegacyDB {
beaconState, err := srv.ReceiveBlockDeprecated(ctx, blk)
if err != nil {
return nil, errors.Wrap(err, "could not process beacon block")
}
if err := db.UpdateChainHead(ctx, blk, beaconState); err != nil {
return nil, errors.Wrap(err, "failed to update chain")
}
ps.chainService.(*blockchain.ChainService).UpdateCanonicalRoots(blk, root)
} else {
if err := ps.beaconDB.(*kv.Store).SaveHeadBlockRoot(ctx, root); err != nil {
return nil, err
}
if err := ps.beaconDB.(*kv.Store).SaveState(ctx, beaconState, root); err != nil {
return nil, err
if err := ps.chainService.(*newBlockchain.ChainService).ReceiveBlock(ctx, blk); err != nil {
return nil, errors.Wrap(err, "could not process beacon block")
}
}
ps.chainService.UpdateCanonicalRoots(blk, root)
log.WithFields(logrus.Fields{
"headRoot": fmt.Sprintf("%#x", bytesutil.Trunc(root[:])),
"headSlot": blk.Slot,

View File

@@ -30,10 +30,11 @@ func init() {
}
func TestProposeBlock_OK(t *testing.T) {
// TODO(3225): Unskip after we have fully deprecated the old chain service.
t.Skip()
helpers.ClearAllCaches()
db := dbutil.SetupDB(t)
defer dbutil.TeardownDB(t, db)
mockChain := &mockChainService{}
ctx := context.Background()
genesis := b.NewGenesisBlock([]byte{})
@@ -60,7 +61,6 @@ func TestProposeBlock_OK(t *testing.T) {
}
proposerServer := &ProposerServer{
chainService: mockChain,
beaconDB: db,
powChainService: &mockPOWChainService{},
}
@@ -82,8 +82,6 @@ func TestComputeStateRoot_OK(t *testing.T) {
ctx := context.Background()
helpers.ClearAllCaches()
mockChain := &mockChainService{}
deposits, privKeys := testutil.SetupInitialDeposits(t, 100)
beaconState, err := state.GenesisBeaconState(deposits, 0, &ethpb.Eth1Data{})
if err != nil {
@@ -114,7 +112,6 @@ func TestComputeStateRoot_OK(t *testing.T) {
}
proposerServer := &ProposerServer{
chainService: mockChain,
beaconDB: db,
powChainService: &mockPOWChainService{},
}
@@ -243,8 +240,7 @@ func TestPendingAttestations_FiltersWithinInclusionDelay(t *testing.T) {
operationService: &mockOperationService{
pendingAttestations: []*ethpb.Attestation{att},
},
chainService: &mockChainService{},
beaconDB: db,
beaconDB: db,
}
blk := &ethpb.BeaconBlock{
Slot: beaconState.Slot,
@@ -405,7 +401,6 @@ func TestPendingAttestations_FiltersExpiredAttestations(t *testing.T) {
expectedNumberOfAttestations := 3
proposerServer := &ProposerServer{
operationService: opService,
chainService: &mockChainService{},
beaconDB: db,
}
@@ -510,7 +505,6 @@ func TestPendingDeposits_Eth1DataVoteOK(t *testing.T) {
bs := &ProposerServer{
beaconDB: db,
powChainService: p,
chainService: newMockChainService(),
}
blk := &ethpb.BeaconBlock{
@@ -688,7 +682,6 @@ func TestPendingDeposits_OutsideEth1FollowWindow(t *testing.T) {
bs := &ProposerServer{
beaconDB: db,
powChainService: p,
chainService: newMockChainService(),
depositCache: depositCache,
}
@@ -841,7 +834,6 @@ func TestPendingDeposits_FollowsCorrectEth1Block(t *testing.T) {
bs := &ProposerServer{
beaconDB: db,
powChainService: p,
chainService: newMockChainService(),
depositCache: depositCache,
}
@@ -966,7 +958,6 @@ func TestPendingDeposits_CantReturnBelowStateEth1DepositIndex(t *testing.T) {
bs := &ProposerServer{
beaconDB: db,
powChainService: p,
chainService: newMockChainService(),
depositCache: depositCache,
}
@@ -1083,7 +1074,6 @@ func TestPendingDeposits_CantReturnMoreThanMax(t *testing.T) {
bs := &ProposerServer{
beaconDB: db,
powChainService: p,
chainService: newMockChainService(),
depositCache: depositCache,
}
@@ -1198,7 +1188,6 @@ func TestPendingDeposits_CantReturnMoreDepositCount(t *testing.T) {
bs := &ProposerServer{
beaconDB: db,
powChainService: p,
chainService: newMockChainService(),
depositCache: depositCache,
}
@@ -1529,7 +1518,6 @@ func TestDeposits_ReturnsEmptyList_IfLatestEth1DataEqGenesisEth1Block(t *testing
bs := &ProposerServer{
beaconDB: db,
powChainService: p,
chainService: newMockChainService(),
depositCache: depositCache,
}

View File

@@ -16,7 +16,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
blockchain "github.com/prysmaticlabs/prysm/beacon-chain/deprecated-blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/operations"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/sync"
@@ -39,13 +38,6 @@ func init() {
log = logrus.WithField("prefix", "rpc")
}
type chainService interface {
StateInitializedFeed() *event.Feed
blockchain.BlockReceiver
blockchain.ForkChoice
blockchain.TargetsFetcher
}
type operationService interface {
operations.Pool
IsAttCanonical(ctx context.Context, att *ethpb.Attestation) (bool, error)
@@ -74,7 +66,7 @@ type Service struct {
ctx context.Context
cancel context.CancelFunc
beaconDB db.Database
chainService chainService
chainService interface{}
powChainService powChainService
operationService operationService
syncService sync.Checker
@@ -96,7 +88,7 @@ type Config struct {
CertFlag string
KeyFlag string
BeaconDB db.Database
ChainService chainService
ChainService interface{}
POWChainService powChainService
OperationService operationService
SyncService sync.Checker
@@ -165,8 +157,7 @@ func (s *Service) Start() {
beaconDB: s.beaconDB,
ctx: s.ctx,
powChainService: s.powChainService,
chainService: s.chainService,
targetsFetcher: s.chainService,
chainService: s.chainService.(stateFeedListener),
operationService: s.operationService,
incomingAttestation: s.incomingAttestation,
canonicalStateChan: s.canonicalStateChan,
@@ -189,7 +180,6 @@ func (s *Service) Start() {
validatorServer := &ValidatorServer{
ctx: s.ctx,
beaconDB: s.beaconDB,
chainService: s.chainService,
canonicalStateChan: s.canonicalStateChan,
powChainService: s.powChainService,
depositCache: s.depositCache,

View File

@@ -1,7 +1,6 @@
package rpc
import (
"bytes"
"context"
"errors"
"fmt"
@@ -9,7 +8,6 @@ import (
"testing"
"github.com/gogo/protobuf/proto"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/params"
@@ -78,56 +76,6 @@ func (ms *mockOperationService) AttestationPool(_ context.Context, expectedSlot
}, nil
}
type mockChainService struct {
blockFeed *event.Feed
stateFeed *event.Feed
attestationFeed *event.Feed
stateInitializedFeed *event.Feed
canonicalBlocks map[uint64][]byte
targets map[uint64]*pb.AttestationTarget
}
func (m *mockChainService) StateInitializedFeed() *event.Feed {
return m.stateInitializedFeed
}
func (m *mockChainService) ReceiveBlockDeprecated(ctx context.Context, block *ethpb.BeaconBlock) (*pb.BeaconState, error) {
return &pb.BeaconState{}, nil
}
func (m *mockChainService) ApplyForkChoiceRuleDeprecated(ctx context.Context, block *ethpb.BeaconBlock, computedState *pb.BeaconState) error {
return nil
}
func (m *mockChainService) CanonicalBlockFeed() *event.Feed {
return new(event.Feed)
}
func (m *mockChainService) UpdateCanonicalRoots(block *ethpb.BeaconBlock, root [32]byte) {
}
func (m mockChainService) SaveHistoricalState(beaconState *pb.BeaconState) error {
return nil
}
func (m mockChainService) IsCanonical(slot uint64, hash []byte) bool {
return bytes.Equal(m.canonicalBlocks[slot], hash)
}
func (m *mockChainService) AttestationTargets(justifiedState *pb.BeaconState) (map[uint64]*pb.AttestationTarget, error) {
return m.targets, nil
}
func newMockChainService() *mockChainService {
return &mockChainService{
blockFeed: new(event.Feed),
stateFeed: new(event.Feed),
attestationFeed: new(event.Feed),
stateInitializedFeed: new(event.Feed),
}
}
type mockSyncService struct {
}
@@ -142,10 +90,11 @@ func (ms *mockSyncService) Syncing() bool {
func TestLifecycle_OK(t *testing.T) {
hook := logTest.NewGlobal()
rpcService := NewRPCService(context.Background(), &Config{
Port: "7348",
CertFlag: "alice.crt",
KeyFlag: "alice.key",
SyncService: &mockSyncService{},
Port: "7348",
CertFlag: "alice.crt",
KeyFlag: "alice.key",
SyncService: &mockSyncService{},
ChainService: &mockStateFeedListener{},
})
rpcService.Start()
@@ -162,8 +111,9 @@ func TestRPC_BadEndpoint(t *testing.T) {
hook := logTest.NewGlobal()
rpcService := NewRPCService(context.Background(), &Config{
Port: "ralph merkle!!!",
SyncService: &mockSyncService{},
Port: "ralph merkle!!!",
SyncService: &mockSyncService{},
ChainService: &mockStateFeedListener{},
})
testutil.AssertLogsDoNotContain(t, hook, "Could not listen to port in Start()")
@@ -190,8 +140,9 @@ func TestStatus_CredentialError(t *testing.T) {
func TestRPC_InsecureEndpoint(t *testing.T) {
hook := logTest.NewGlobal()
rpcService := NewRPCService(context.Background(), &Config{
Port: "7777",
SyncService: &mockSyncService{},
Port: "7777",
SyncService: &mockSyncService{},
ChainService: &mockStateFeedListener{},
})
rpcService.Start()

View File

@@ -29,7 +29,6 @@ import (
type ValidatorServer struct {
ctx context.Context
beaconDB db.Database
chainService chainService
canonicalStateChan chan *pbp2p.BeaconState
powChainService powChainService
depositCache *depositcache.DepositCache

View File

@@ -882,7 +882,6 @@ func TestWaitForActivation_ContextClosed(t *testing.T) {
vs := &ValidatorServer{
beaconDB: db,
ctx: ctx,
chainService: newMockChainService(),
powChainService: &mockPOWChainService{},
canonicalStateChan: make(chan *pbp2p.BeaconState, 1),
depositCache: depositcache.NewDepositCache(),
@@ -993,7 +992,6 @@ func TestWaitForActivation_ValidatorOriginallyExists(t *testing.T) {
vs := &ValidatorServer{
beaconDB: db,
ctx: context.Background(),
chainService: newMockChainService(),
canonicalStateChan: make(chan *pbp2p.BeaconState, 1),
powChainService: &mockPOWChainService{},
depositCache: depositCache,
@@ -1115,7 +1113,6 @@ func TestMultipleValidatorStatus_OK(t *testing.T) {
vs := &ValidatorServer{
beaconDB: db,
ctx: context.Background(),
chainService: newMockChainService(),
canonicalStateChan: make(chan *pbp2p.BeaconState, 1),
powChainService: &mockPOWChainService{},
depositCache: depositCache,