Various cleanup and bugfixes around validator/RBC interaction (#657)

This commit is contained in:
Yutaro Mori
2018-10-15 00:29:57 +09:00
committed by Raul Jordan
parent 6d46dda33c
commit 8bffae1316
24 changed files with 251 additions and 392 deletions

View File

@@ -23,7 +23,6 @@ type rpcClientService interface {
type beaconClientService interface {
AttesterAssignmentFeed() *event.Feed
PublicKey() []byte
}
// Attester holds functionality required to run a block attester
@@ -35,6 +34,7 @@ type Attester struct {
rpcClientService rpcClientService
assignmentChan chan *pbp2p.BeaconBlock
shardID uint64
publicKey []byte
}
// Config options for an attester service.
@@ -43,6 +43,7 @@ type Config struct {
ShardID uint64
Assigner beaconClientService
Client rpcClientService
PublicKey []byte
}
// NewAttester creates a new attester instance.
@@ -54,6 +55,7 @@ func NewAttester(ctx context.Context, cfg *Config) *Attester {
beaconService: cfg.Assigner,
rpcClientService: cfg.Client,
shardID: cfg.ShardID,
publicKey: cfg.PublicKey,
assignmentChan: make(chan *pbp2p.BeaconBlock, cfg.AssignmentBuf),
}
}
@@ -94,7 +96,7 @@ func (a *Attester) run(attester pb.AttesterServiceClient, validator pb.Validator
latestBlockHash := hashutil.Hash(data)
pubKeyReq := &pb.PublicKey{
PublicKey: a.beaconService.PublicKey(),
PublicKey: a.publicKey,
}
shardID, err := validator.ValidatorShardID(a.ctx, pubKeyReq)
if err != nil {
@@ -126,7 +128,7 @@ func (a *Attester) run(attester pb.AttesterServiceClient, validator pb.Validator
log.Errorf("could not attest head: %v", err)
continue
}
log.Infof("Attestation proposed successfully with hash 0x%x", res.AttestationHash)
log.Infof("Attestation proposed successfully with hash %#x", res.AttestationHash)
}
}
}

View File

@@ -8,8 +8,8 @@ go_library(
deps = [
"//proto/beacon/rpc/v1:go_default_library",
"//shared/event:go_default_library",
"//shared/slotticker:go_default_library",
"//validator/params:go_default_library",
"//validator/utils:go_default_library",
"@com_github_golang_protobuf//ptypes:go_default_library_gen",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_bazel_rules_go//proto/wkt:empty_go_proto",
@@ -25,7 +25,6 @@ go_test(
"//proto/beacon/rpc/v1:go_default_library",
"//shared/testutil:go_default_library",
"//validator/internal:go_default_library",
"//validator/params:go_default_library",
"@com_github_golang_mock//gomock:go_default_library",
"@com_github_golang_protobuf//ptypes:go_default_library_gen",
"@com_github_sirupsen_logrus//:go_default_library",

View File

@@ -3,16 +3,16 @@ package beacon
import (
"bytes"
"context"
"fmt"
"io"
"math"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/slotticker"
"github.com/prysmaticlabs/prysm/validator/params"
"github.com/prysmaticlabs/prysm/validator/utils"
"github.com/sirupsen/logrus"
)
@@ -35,7 +35,6 @@ type Service struct {
proposerAssignmentFeed *event.Feed
processedAttestationFeed *event.Feed
genesisTimestamp time.Time
slotAlignmentDuration time.Duration
}
// NewBeaconValidator instantiates a service that interacts with a beacon node
@@ -50,7 +49,6 @@ func NewBeaconValidator(ctx context.Context, pubKey []byte, rpcClient rpcClientS
attesterAssignmentFeed: new(event.Feed),
proposerAssignmentFeed: new(event.Feed),
processedAttestationFeed: new(event.Feed),
slotAlignmentDuration: time.Duration(params.DefaultConfig().SlotDuration) * time.Second,
}
}
@@ -69,23 +67,21 @@ func (s *Service) Start() {
// Note: this does not validate the current system time against a global
// NTP server, which will be important to do in production.
// currently in a cycle we are supposed to participate in.
s.fetchCurrentAssignmentsAndGenesisTime(beaconServiceClient)
// Then, we kick off a routine that uses the begins a ticker based on the beacon node's
// genesis timestamp and the validator will use this slot ticker to
// determine when it is assigned to perform proposals or attestations.
//
// We block until the current time is a multiple of params.SlotDuration
// so the validator and beacon node's internal tickers are aligned.
utils.BlockingWait(s.slotAlignmentDuration)
if err := s.fetchCurrentAssignmentsAndGenesisTime(beaconServiceClient); err != nil {
log.Error(err)
return
}
// We kick off a routine that listens for stream of validator assignment coming from
// beacon node. This will update validator client on which slot, shard ID and what
// responsbility to perform.
go s.listenForAssignmentChange(beaconServiceClient)
slotTicker := time.NewTicker(s.slotAlignmentDuration)
go s.waitForAssignment(slotTicker.C, beaconServiceClient)
slotTicker := slotticker.GetSlotTicker(s.genesisTimestamp, params.DemoConfig().SlotDuration)
go func() {
s.waitForAssignment(slotTicker.C(), beaconServiceClient)
slotTicker.Done()
}()
go s.listenForProcessedAttestations(beaconServiceClient)
}
@@ -110,7 +106,7 @@ func (s *Service) Stop() error {
//
// From this, the validator client can deduce what slot interval the beacon
// node is in and determine when exactly it is time to propose or attest.
func (s *Service) fetchCurrentAssignmentsAndGenesisTime(client pb.BeaconServiceClient) {
func (s *Service) fetchCurrentAssignmentsAndGenesisTime(client pb.BeaconServiceClient) error {
// Currently fetches assignments for all validators.
req := &pb.ValidatorAssignmentRequest{
AllValidators: true,
@@ -119,32 +115,23 @@ func (s *Service) fetchCurrentAssignmentsAndGenesisTime(client pb.BeaconServiceC
if err != nil {
// If this RPC request fails, the entire system should fatal as it is critical for
// the validator to begin this way.
log.Fatalf("could not fetch genesis time and latest canonical state from beacon node: %v", err)
return fmt.Errorf("could not fetch genesis time and latest canonical state from beacon node: %v", err)
}
// Determine what slot the beacon node is in by checking the number of seconds
// since the genesis block.
genesisTimestamp, err := ptypes.Timestamp(res.GetGenesisTimestamp())
if err != nil {
log.Fatalf("cannot compute genesis timestamp: %v", err)
return fmt.Errorf("cannot compute genesis timestamp: %v", err)
}
log.Infof("Setting validator genesis time to %s", genesisTimestamp.Format(time.UnixDate))
s.genesisTimestamp = genesisTimestamp
for _, assign := range res.Assignments {
if bytes.Equal(assign.PublicKey.PublicKey, s.pubKey) {
s.role = assign.Role
// + 1 to account for the genesis block being slot 0.
s.assignedSlot = s.CurrentCycleStartSlot(params.DemoConfig().CycleLength) + assign.AssignedSlot + 1
s.shardID = assign.ShardId
log.Infof("Validator shuffled. Pub key 0x%s re-assigned to shard ID %d for %v duty at slot %d",
string(s.pubKey),
s.shardID,
s.role,
s.assignedSlot)
}
startSlot := s.startSlot()
if err := s.assignRole(res.Assignments, startSlot); err != nil {
return fmt.Errorf("unable to assign a role: %v", err)
}
return nil
}
// listenForAssignmentChange listens for validator assignment changes via a RPC stream.
@@ -153,7 +140,7 @@ func (s *Service) listenForAssignmentChange(client pb.BeaconServiceClient) {
req := &pb.ValidatorAssignmentRequest{PublicKeys: []*pb.PublicKey{{PublicKey: s.pubKey}}}
stream, err := client.ValidatorAssignments(s.ctx, req)
if err != nil {
log.Errorf("could not fetch validator assigned slot and responsibility from beacon node: %v", err)
log.Errorf("failed to fetch validator assignments stream: %v", err)
return
}
for {
@@ -173,55 +160,43 @@ func (s *Service) listenForAssignmentChange(client pb.BeaconServiceClient) {
break
}
for _, assign := range assignment.Assignments {
if bytes.Equal(assign.PublicKey.PublicKey, s.pubKey) {
s.role = assign.Role
if s.CurrentCycleStartSlot(params.DemoConfig().CycleLength) == 0 {
// +1 to account for genesis block being slot 0.
s.assignedSlot = params.DemoConfig().CycleLength + assign.AssignedSlot + 1
} else {
s.assignedSlot = s.CurrentCycleStartSlot(params.DemoConfig().CycleLength) + assign.AssignedSlot + 1
}
s.shardID = assign.ShardId
log.Infof("Validator with pub key 0x%s re-assigned to shard ID %d for %v duty at slot %d",
string(s.pubKey),
s.shardID,
s.role,
s.assignedSlot)
}
startSlot := s.startSlot()
if err := s.assignRole(assignment.Assignments, startSlot); err != nil {
log.Errorf("Could not assign a role for validator: %v", err)
break
}
}
}
// waitForAssignment waits till it's validator's role to attest or propose. Then it forwards
// the canonical block to the proposer service or attester service to process.
func (s *Service) waitForAssignment(ticker <-chan time.Time, client pb.BeaconServiceClient) {
func (s *Service) waitForAssignment(ticker <-chan uint64, client pb.BeaconServiceClient) {
for {
select {
case <-s.ctx.Done():
return
case <-ticker:
currentSlot := s.CurrentBeaconSlot()
log.Infof("role: %v, assigned slot: %d, current slot: %d", s.role, s.assignedSlot, currentSlot)
if s.role == pb.ValidatorRole_ATTESTER && s.assignedSlot == currentSlot {
log.WithField("slotNumber", s.CurrentBeaconSlot()).Info("Assigned attest slot number reached")
block, err := client.CanonicalHead(s.ctx, &empty.Empty{})
if err != nil {
log.Errorf("Could not fetch canonical head via gRPC from beacon node: %v", err)
continue
}
case slot := <-ticker:
log = log.WithField("slot", slot)
log.Infof("tick")
// Special case: skip responsibilities if assigned to the genesis block.
if s.assignedSlot != slot || s.assignedSlot == 0 {
continue
}
block, err := client.CanonicalHead(s.ctx, &empty.Empty{})
if err != nil {
log.Errorf("Could not fetch canonical head via gRPC from beacon node: %v", err)
continue
}
if s.role == pb.ValidatorRole_ATTESTER {
log.Info("Assigned attestation slot number reached")
// We forward the latest canonical block to the attester service a feed.
s.attesterAssignmentFeed.Send(block)
} else if s.role == pb.ValidatorRole_PROPOSER && s.assignedSlot == currentSlot {
log.WithField("slotNumber", s.CurrentBeaconSlot()).Info("Assigned proposal slot number reached")
block, err := client.CanonicalHead(s.ctx, &empty.Empty{})
if err != nil {
log.Errorf("Could not fetch canonical head via gRPC from beacon node: %v", err)
continue
}
} else if s.role == pb.ValidatorRole_PROPOSER {
log.Info("Assigned proposal slot number reached")
// We forward the latest canonical block to the proposer service a feed.
s.proposerAssignmentFeed.Send(block)
}
@@ -259,6 +234,52 @@ func (s *Service) listenForProcessedAttestations(client pb.BeaconServiceClient)
}
}
// startSlot returns the first slot of the given slot's cycle.
func (s *Service) startSlot() uint64 {
duration := params.DemoConfig().SlotDuration
cycleLength := params.DemoConfig().CycleLength
slot := slotticker.CurrentSlot(s.genesisTimestamp, duration, time.Since)
return slot - slot%cycleLength
}
func (s *Service) assignRole(assignments []*pb.Assignment, startSlot uint64) error {
var role pb.ValidatorRole
var assignedSlot uint64
var shardID uint64
for _, assign := range assignments {
if !bytes.Equal(assign.PublicKey.PublicKey, s.pubKey) {
continue
}
role = assign.Role
assignedSlot = startSlot + assign.AssignedSlot
shardID = assign.ShardId
log.Infof("Validator shuffled. Pub key %#x assigned to shard ID %d for %v duty at slot %d",
s.pubKey,
shardID,
role,
assignedSlot)
break
}
if role == pb.ValidatorRole_UNKNOWN {
return fmt.Errorf("validator role was not assigned for key: %x", s.pubKey)
}
s.role = role
s.assignedSlot = assignedSlot
s.shardID = shardID
log = log.WithFields(logrus.Fields{
"role": role,
"assignedSlot": assignedSlot,
"shardID": shardID,
})
return nil
}
// AttesterAssignmentFeed returns a feed that is written to whenever it is the validator's
// slot to perform attestations.
func (s *Service) AttesterAssignmentFeed() *event.Feed {
@@ -276,24 +297,3 @@ func (s *Service) ProposerAssignmentFeed() *event.Feed {
func (s *Service) ProcessedAttestationFeed() *event.Feed {
return s.processedAttestationFeed
}
// PublicKey returns validator's public key.
func (s *Service) PublicKey() []byte {
return s.pubKey
}
// CurrentBeaconSlot based on the genesis timestamp of the protocol.
func (s *Service) CurrentBeaconSlot() uint64 {
secondsSinceGenesis := time.Since(s.genesisTimestamp).Seconds()
if secondsSinceGenesis-params.DefaultConfig().SlotDuration < 0 {
return 0
}
return uint64(math.Floor(secondsSinceGenesis/params.DefaultConfig().SlotDuration)) - 1
}
// CurrentCycleStartSlot returns the slot at which the current cycle started.
func (s *Service) CurrentCycleStartSlot(cycleLength uint64) uint64 {
currentSlot := s.CurrentBeaconSlot()
cycleNum := currentSlot / cycleLength
return uint64(cycleNum) * cycleLength
}

View File

@@ -1,7 +1,6 @@
package beacon
import (
"bytes"
"context"
"errors"
"io"
@@ -16,7 +15,6 @@ import (
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/validator/internal"
"github.com/prysmaticlabs/prysm/validator/params"
"github.com/sirupsen/logrus"
logTest "github.com/sirupsen/logrus/hooks/test"
)
@@ -56,6 +54,15 @@ func (fc *mockLifecycleClient) BeaconServiceClient() pb.BeaconServiceClient {
gomock.Any(),
).Return(&pb.CurrentAssignmentsResponse{
GenesisTimestamp: ptypes.TimestampNow(),
Assignments: []*pb.Assignment{
{
PublicKey: &pb.PublicKey{
PublicKey: []byte{0},
},
ShardId: 0,
Role: pb.ValidatorRole_PROPOSER,
},
},
}, nil)
attesterStream := internal.NewMockBeaconService_LatestAttestationClient(fc.ctrl)
@@ -90,10 +97,9 @@ func TestLifecycle(t *testing.T) {
if b.ProcessedAttestationFeed() == nil {
t.Error("ProcessedAttestationFeed empty")
}
if !bytes.Equal(b.PublicKey(), []byte{}) {
t.Error("Incorrect public key")
}
b.slotAlignmentDuration = time.Millisecond * 10
b.pubKey = []byte{0}
b.Start()
time.Sleep(time.Millisecond * 10)
testutil.AssertLogsContain(t, hook, "Starting service")
@@ -101,16 +107,6 @@ func TestLifecycle(t *testing.T) {
testutil.AssertLogsContain(t, hook, "Stopping service")
}
func TestCurrentBeaconSlot(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
b := NewBeaconValidator(context.Background(), []byte{}, &mockLifecycleClient{ctrl})
b.genesisTimestamp = time.Now()
if b.CurrentBeaconSlot() != 0 {
t.Errorf("Expected us to be in the 0th slot, received %v", b.CurrentBeaconSlot())
}
}
func TestWaitForAssignmentProposer(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
@@ -124,16 +120,15 @@ func TestWaitForAssignmentProposer(t *testing.T) {
).Return(nil, nil)
exitRoutine := make(chan bool)
timeChan := make(chan time.Time)
slotChan := make(chan uint64)
go func() {
b.waitForAssignment(timeChan, mockServiceClient)
b.waitForAssignment(slotChan, mockServiceClient)
<-exitRoutine
}()
b.role = pb.ValidatorRole_PROPOSER
b.genesisTimestamp = time.Now()
b.assignedSlot = 0
timeChan <- time.Now()
b.assignedSlot = 1
slotChan <- 1
b.cancel()
exitRoutine <- true
@@ -153,16 +148,15 @@ func TestWaitForAssignmentProposerError(t *testing.T) {
).Return(nil, errors.New("failed"))
exitRoutine := make(chan bool)
timeChan := make(chan time.Time)
slotChan := make(chan uint64)
go func() {
b.waitForAssignment(timeChan, mockServiceClient)
b.waitForAssignment(slotChan, mockServiceClient)
<-exitRoutine
}()
b.role = pb.ValidatorRole_PROPOSER
b.genesisTimestamp = time.Now()
b.assignedSlot = 0
timeChan <- time.Now()
b.assignedSlot = 1
slotChan <- 1
b.cancel()
exitRoutine <- true
@@ -182,20 +176,19 @@ func TestWaitForAssignmentAttester(t *testing.T) {
).Return(nil, nil)
exitRoutine := make(chan bool)
timeChan := make(chan time.Time)
slotChan := make(chan uint64)
go func() {
b.waitForAssignment(timeChan, mockServiceClient)
b.waitForAssignment(slotChan, mockServiceClient)
<-exitRoutine
}()
b.role = pb.ValidatorRole_ATTESTER
b.genesisTimestamp = time.Now()
b.assignedSlot = 0
timeChan <- time.Now()
b.assignedSlot = 1
slotChan <- 1
b.cancel()
exitRoutine <- true
testutil.AssertLogsContain(t, hook, "Assigned attest slot number reached")
testutil.AssertLogsContain(t, hook, "Assigned attestation slot number reached")
}
func TestWaitForAssignmentAttesterError(t *testing.T) {
@@ -211,16 +204,15 @@ func TestWaitForAssignmentAttesterError(t *testing.T) {
).Return(nil, errors.New("failed"))
exitRoutine := make(chan bool)
timeChan := make(chan time.Time)
slotChan := make(chan uint64)
go func() {
b.waitForAssignment(timeChan, mockServiceClient)
b.waitForAssignment(slotChan, mockServiceClient)
<-exitRoutine
}()
b.role = pb.ValidatorRole_ATTESTER
b.genesisTimestamp = time.Now()
b.assignedSlot = 0
timeChan <- time.Now()
b.assignedSlot = 1
slotChan <- 1
b.cancel()
exitRoutine <- true
@@ -302,11 +294,10 @@ func TestListenForAssignmentProposer(t *testing.T) {
stream := internal.NewMockBeaconService_ValidatorAssignmentsClient(ctrl)
// Testing proposer assignment.
assignedSlot := b.CurrentCycleStartSlot(params.DefaultConfig().CycleLength) + 2
stream.EXPECT().Recv().Return(&pb.ValidatorAssignmentResponse{Assignments: []*pb.Assignment{{
PublicKey: &pb.PublicKey{PublicKey: []byte{'A'}},
ShardId: 2,
AssignedSlot: assignedSlot,
AssignedSlot: 2,
Role: pb.ValidatorRole_PROPOSER}}}, nil)
stream.EXPECT().Recv().Return(&pb.ValidatorAssignmentResponse{}, io.EOF)
@@ -316,9 +307,11 @@ func TestListenForAssignmentProposer(t *testing.T) {
gomock.Any(),
).Return(stream, nil)
b.genesisTimestamp = time.Now()
b.pubKey = []byte{'A'}
b.listenForAssignmentChange(mockServiceValidator)
testutil.AssertLogsContain(t, hook, "Validator with pub key 0xA re-assigned to shard ID 2 for PROPOSER duty")
testutil.AssertLogsContain(t, hook, "Validator shuffled. Pub key 0x41 assigned to shard ID 2 for PROPOSER duty")
}
func TestListenForAssignmentError(t *testing.T) {
@@ -357,7 +350,6 @@ func TestListenForAssignmentClientError(t *testing.T) {
b.listenForAssignmentChange(mockServiceValidator)
testutil.AssertLogsContain(t, hook, "stream creation failed")
testutil.AssertLogsContain(t, hook, "could not fetch validator assigned slot and responsibility from beacon node")
}
func TestListenForAssignmentCancelContext(t *testing.T) {

View File

@@ -93,7 +93,7 @@ func NewShardInstance(ctx *cli.Context) (*ShardEthereum, error) {
return nil, err
}
if err := shardEthereum.registerAttesterService(); err != nil {
if err := shardEthereum.registerAttesterService(pubKey); err != nil {
return nil, err
}
@@ -199,7 +199,7 @@ func (s *ShardEthereum) registerBeaconService(pubKey []byte) error {
}
// registerAttesterService that listens to assignments from the beacon service.
func (s *ShardEthereum) registerAttesterService() error {
func (s *ShardEthereum) registerAttesterService(pubKey []byte) error {
var beaconService *beacon.Service
if err := s.services.FetchService(&beaconService); err != nil {
return err
@@ -214,6 +214,7 @@ func (s *ShardEthereum) registerAttesterService() error {
Assigner: beaconService,
AssignmentBuf: 100,
Client: rpcService,
PublicKey: pubKey,
})
return s.services.RegisterService(att)
}

View File

@@ -32,7 +32,7 @@ func DefaultCollationSizeLimit() int64 {
// Config contains configs for node to participate in the sharded universe.
type Config struct {
CollationSizeLimit int64 // CollationSizeLimit is the maximum size the serialized blobs in a collation can take.
SlotDuration float64 // SlotDuration in seconds.
CollationSizeLimit int64 // CollationSizeLimit is the maximum size the serialized blobs in a collation can take.
SlotDuration uint64 // SlotDuration in seconds.
CycleLength uint64
}

View File

@@ -77,7 +77,6 @@ func (p *Proposer) Start() {
go p.run(p.ctx.Done(), client)
go p.processAttestation(p.ctx.Done())
}
// Stop the main loop.
@@ -183,7 +182,7 @@ func (p *Proposer) run(done <-chan struct{}, client pb.ProposerServiceClient) {
continue
}
log.Infof("Block proposed successfully with hash 0x%x", res.BlockHash)
log.Infof("Block proposed successfully with hash %#x", res.BlockHash)
p.pendingAttestation = nil
p.lock.Unlock()
}

View File

@@ -134,7 +134,7 @@ func TestProposerReceiveBeaconBlock(t *testing.T) {
exitRoutine <- true
testutil.AssertLogsContain(t, hook, "Performing proposer responsibility")
testutil.AssertLogsContain(t, hook, fmt.Sprintf("Block proposed successfully with hash 0x%x", []byte("hi")))
testutil.AssertLogsContain(t, hook, fmt.Sprintf("Block proposed successfully with hash %#x", []byte("hi")))
testutil.AssertLogsContain(t, hook, "Proposer context closed")
}
@@ -226,7 +226,7 @@ func TestFullProposalOfBlock(t *testing.T) {
exitRoutine <- true
testutil.AssertLogsContain(t, hook, "Performing proposer responsibility")
testutil.AssertLogsContain(t, hook, fmt.Sprintf("Block proposed successfully with hash 0x%x", []byte("hi")))
testutil.AssertLogsContain(t, hook, fmt.Sprintf("Block proposed successfully with hash %#x", []byte("hi")))
testutil.AssertLogsContain(t, hook, "Proposer context closed")
testutil.AssertLogsContain(t, hook, "Attestation stored in memory")
testutil.AssertLogsContain(t, hook, "Proposer context closed")