RPC checks sync status (#3968)

* Check for sync status

* Tests
This commit is contained in:
terence tsao
2019-11-11 08:29:27 -08:00
committed by Raul Jordan
parent cc18b2f4d3
commit 62aaec1e20
6 changed files with 55 additions and 0 deletions

View File

@@ -13,10 +13,13 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/operations"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/sync"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/params"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// AttesterServer defines a server implementation of the gRPC Attester service,
@@ -28,6 +31,7 @@ type AttesterServer struct {
attReceiver blockchain.AttestationReceiver
headFetcher blockchain.HeadFetcher
attestationCache *cache.AttestationCache
syncChecker sync.Checker
}
// SubmitAttestation is a function called by an attester in a sharding validator to vote
@@ -63,6 +67,11 @@ func (as *AttesterServer) RequestAttestation(ctx context.Context, req *pb.Attest
trace.Int64Attribute("slot", int64(req.Slot)),
trace.Int64Attribute("shard", int64(req.Shard)),
)
if as.syncChecker.Syncing() {
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
res, err := as.attestationCache.Get(ctx, req)
if err != nil {
return nil, err

View File

@@ -2,6 +2,7 @@ package rpc
import (
"context"
"strings"
"sync"
"testing"
@@ -129,6 +130,7 @@ func TestRequestAttestation_OK(t *testing.T) {
beaconState.BlockRoots[2*params.BeaconConfig().SlotsPerEpoch] = justifiedRoot[:]
attesterServer := &AttesterServer{
p2p: &mockp2p.MockBroadcaster{},
syncChecker: &mockSyncChecker{false},
attestationCache: cache.NewAttestationCache(),
headFetcher: &mock.ChainService{State: beaconState, Root: blockRoot[:]},
attReceiver: &mock.ChainService{State: beaconState, Root: blockRoot[:]},
@@ -169,6 +171,16 @@ func TestRequestAttestation_OK(t *testing.T) {
}
}
func TestRequestAttestation_SyncNotReady(t *testing.T) {
as := &AttesterServer{
syncChecker: &mockSyncChecker{syncing: true},
}
_, err := as.RequestAttestation(context.Background(), &pb.AttestationRequest{})
if strings.Contains(err.Error(), "syncing to latest head") {
t.Error("Did not get wanted error")
}
}
func TestAttestationDataAtSlot_handlesFarAwayJustifiedEpoch(t *testing.T) {
// Scenario:
//
@@ -233,6 +245,7 @@ func TestAttestationDataAtSlot_handlesFarAwayJustifiedEpoch(t *testing.T) {
attestationCache: cache.NewAttestationCache(),
headFetcher: &mock.ChainService{State: beaconState, Root: blockRoot[:]},
attReceiver: &mock.ChainService{State: beaconState, Root: blockRoot[:]},
syncChecker: &mockSyncChecker{false},
}
req := &pb.AttestationRequest{
@@ -282,6 +295,7 @@ func TestAttestationDataAtSlot_handlesInProgressRequest(t *testing.T) {
ctx := context.Background()
server := &AttesterServer{
attestationCache: cache.NewAttestationCache(),
syncChecker: &mockSyncChecker{false},
}
req := &pb.AttestationRequest{

View File

@@ -16,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/operations"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
"github.com/prysmaticlabs/prysm/beacon-chain/sync"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
@@ -24,6 +25,8 @@ import (
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/trieutil"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// ProposerServer defines a server implementation of the gRPC Proposer service,
@@ -41,6 +44,7 @@ type ProposerServer struct {
canonicalStateChan chan *pbp2p.BeaconState
depositFetcher depositcache.DepositFetcher
pendingDepositsFetcher depositcache.PendingDepositsFetcher
syncChecker sync.Checker
}
// RequestBlock is called by a proposer during its assigned slot to request a block to sign
@@ -50,6 +54,10 @@ func (ps *ProposerServer) RequestBlock(ctx context.Context, req *pb.BlockRequest
defer span.End()
span.AddAttributes(trace.Int64Attribute("slot", int64(req.Slot)))
if ps.syncChecker.Syncing() {
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
// Retrieve the parent block as the current head of the canonical chain
parent := ps.headFetcher.HeadBlock()

View File

@@ -176,6 +176,7 @@ func (s *Service) Start() {
canonicalStateChan: s.canonicalStateChan,
depositFetcher: s.depositFetcher,
pendingDepositsFetcher: s.pendingDepositFetcher,
syncChecker: s.syncService,
}
attesterServer := &AttesterServer{
p2p: s.p2p,
@@ -184,6 +185,7 @@ func (s *Service) Start() {
attReceiver: s.attestationReceiver,
headFetcher: s.headFetcher,
attestationCache: cache.NewAttestationCache(),
syncChecker: s.syncService,
}
validatorServer := &ValidatorServer{
ctx: s.ctx,
@@ -195,6 +197,7 @@ func (s *Service) Start() {
chainStartFetcher: s.chainStartFetcher,
eth1InfoFetcher: s.powChainService,
depositFetcher: s.depositFetcher,
syncChecker: s.syncService,
stateFeedListener: s.stateFeedListener,
chainStartChan: make(chan time.Time),
}

View File

@@ -7,6 +7,7 @@ import (
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/sync"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -37,6 +38,7 @@ type ValidatorServer struct {
depositFetcher depositcache.DepositFetcher
chainStartFetcher powchain.ChainStartFetcher
eth1InfoFetcher powchain.ChainInfoFetcher
syncChecker sync.Checker
stateFeedListener blockchain.ChainFeeds
chainStartChan chan time.Time
}
@@ -150,6 +152,10 @@ func (vs *ValidatorServer) ValidatorPerformance(
// 3.) The slot at which the committee is assigned.
// 4.) The bool signaling if the validator is expected to propose a block at the assigned slot.
func (vs *ValidatorServer) CommitteeAssignment(ctx context.Context, req *pb.AssignmentRequest) (*pb.AssignmentResponse, error) {
if vs.syncChecker.Syncing() {
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
var err error
s := vs.headFetcher.HeadState()
// Advance state with empty transitions up to the requested epoch start slot.

View File

@@ -84,6 +84,7 @@ func TestNextEpochCommitteeAssignment_WrongPubkeyLength(t *testing.T) {
validatorServer := &ValidatorServer{
beaconDB: db,
headFetcher: &mockChain.ChainService{State: beaconState, Root: genesisRoot[:]},
syncChecker: &mockSyncChecker{false},
}
req := &pb.AssignmentRequest{
PublicKeys: [][]byte{{1}},
@@ -113,6 +114,7 @@ func TestNextEpochCommitteeAssignment_CantFindValidatorIdx(t *testing.T) {
vs := &ValidatorServer{
beaconDB: db,
headFetcher: &mockChain.ChainService{State: beaconState, Root: genesisRoot[:]},
syncChecker: &mockSyncChecker{false},
}
pubKey := make([]byte, 96)
@@ -166,6 +168,7 @@ func TestCommitteeAssignment_OK(t *testing.T) {
vs := &ValidatorServer{
beaconDB: db,
headFetcher: &mockChain.ChainService{State: state, Root: genesisRoot[:]},
syncChecker: &mockSyncChecker{false},
}
// Test the first validator in registry.
@@ -248,6 +251,7 @@ func TestCommitteeAssignment_CurrentEpoch_ShouldNotFail(t *testing.T) {
vs := &ValidatorServer{
beaconDB: db,
headFetcher: &mockChain.ChainService{State: state, Root: genesisRoot[:]},
syncChecker: &mockSyncChecker{false},
}
// Test the first validator in registry.
@@ -302,6 +306,7 @@ func TestCommitteeAssignment_MultipleKeys_OK(t *testing.T) {
vs := &ValidatorServer{
beaconDB: db,
headFetcher: &mockChain.ChainService{State: state, Root: genesisRoot[:]},
syncChecker: &mockSyncChecker{false},
}
pubkey0 := deposits[0].Data.PublicKey
@@ -322,6 +327,16 @@ func TestCommitteeAssignment_MultipleKeys_OK(t *testing.T) {
}
}
func TestCommitteeAssignment_SyncNotReady(t *testing.T) {
vs := &ValidatorServer{
syncChecker: &mockSyncChecker{syncing: true},
}
_, err := vs.CommitteeAssignment(context.Background(), &pb.AssignmentRequest{})
if strings.Contains(err.Error(), "syncing to latest head") {
t.Error("Did not get wanted error")
}
}
func TestValidatorStatus_DepositReceived(t *testing.T) {
db := dbutil.SetupDB(t)
defer dbutil.TeardownDB(t, db)