Validator Clients Use an Internal Clock to Determine Slot Responsibility (#467)

This commit is contained in:
Raul Jordan
2018-09-21 09:32:20 -05:00
committed by GitHub
parent 9d93312e30
commit 190a976d3d
22 changed files with 845 additions and 997 deletions

View File

@@ -6,12 +6,13 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/validator/beacon",
visibility = ["//validator:__subpackages__"],
deps = [
"//proto/beacon/p2p/v1:go_default_library",
"//proto/beacon/rpc/v1:go_default_library",
"//validator/params:go_default_library",
"@com_github_ethereum_go_ethereum//event:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_golang_protobuf//ptypes:go_default_library_gen",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_bazel_rules_go//proto/wkt:empty_go_proto",
"@org_golang_x_crypto//blake2b:go_default_library",
],
)
@@ -25,6 +26,7 @@ go_test(
"//shared/testutil:go_default_library",
"//validator/internal:go_default_library",
"@com_github_golang_mock//gomock:go_default_library",
"@com_github_golang_protobuf//ptypes:go_default_library_gen",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@io_bazel_rules_go//proto/wkt:empty_go_proto",

View File

@@ -4,13 +4,16 @@ import (
"bytes"
"context"
"io"
"math"
"time"
"github.com/ethereum/go-ethereum/event"
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/validator/params"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/blake2b"
)
var log = logrus.WithField("prefix", "beacon")
@@ -30,9 +33,11 @@ type Service struct {
attesterAssignmentFeed *event.Feed
proposerAssignmentFeed *event.Feed
processedAttestationFeed *event.Feed
genesisTimestamp time.Time
}
// NewBeaconValidator instantiates a service that interacts with a beacon node.
// NewBeaconValidator instantiates a service that interacts with a beacon node
// via gRPC requests.
func NewBeaconValidator(ctx context.Context, rpcClient rpcClientService) *Service {
ctx, cancel := context.WithCancel(ctx)
return &Service{
@@ -45,13 +50,34 @@ func NewBeaconValidator(ctx context.Context, rpcClient rpcClientService) *Servic
}
}
// Start the main routine for a beacon service.
// Start the main routine for a beacon client service.
func (s *Service) Start() {
log.Info("Starting service")
client := s.rpcClient.BeaconServiceClient()
go s.fetchBeaconBlocks(client)
go s.fetchCrystallizedState(client)
go s.fetchProcessedAttestations(client)
// First thing the validator does is request the genesis block timestamp
// and the latest, canonical crystallized state from a beacon node. From here,
// a validator can determine its assigned slot by keeping an internal
// ticker that starts at the current slot the beacon node is in. This current slot
// value is determined by taking the time differential between the genesis block
// time and the current system time.
//
// Note: this does not validate the current system time against a global
// NTP server, which will be important to do in production.
// currently in a cycle we are supposed to participate in.
s.fetchGenesisAndCanonicalState(client)
// Then, we kick off a routine that uses the begins a ticker set in fetchGenesisAndCanonicalState
// to wait until the validator's assigned slot to perform proposals or attestations.
slotTicker := time.NewTicker(time.Second * time.Duration(params.DefaultConfig().SlotDuration))
go s.waitForAssignment(slotTicker.C, client)
// We then kick off a routine that listens for streams of cycle transitions
// coming from the beacon node. This will allow the validator client to recalculate
// when it has to perform its responsibilities appropriately using timestamps
// and the IndicesForSlots field inside the received crystallized state.
go s.listenForCrystallizedStates(client)
go s.listenForProcessedAttestations(client)
}
// Stop the main loop..
@@ -61,60 +87,90 @@ func (s *Service) Stop() error {
return nil
}
// AttesterAssignmentFeed returns a feed that is written to whenever it is the validator's
// slot to perform attestations.
func (s *Service) AttesterAssignmentFeed() *event.Feed {
return s.attesterAssignmentFeed
// CurrentBeaconSlot based on the seconds since genesis.
func (s *Service) CurrentBeaconSlot() uint64 {
secondsSinceGenesis := time.Since(s.genesisTimestamp).Seconds()
return uint64(math.Floor(secondsSinceGenesis / 8.0))
}
// ProposerAssignmentFeed returns a feed that is written to whenever it is the validator's
// slot to proposer blocks.
func (s *Service) ProposerAssignmentFeed() *event.Feed {
return s.proposerAssignmentFeed
}
// ProcessedAttestationFeed returns a feed that is wriiten to whenever a validator receives an
// attestation from the beacon node.
func (s *Service) ProcessedAttestationFeed() *event.Feed {
return s.processedAttestationFeed
}
func (s *Service) fetchBeaconBlocks(client pb.BeaconServiceClient) {
stream, err := client.LatestBeaconBlock(s.ctx, &empty.Empty{})
// fetchGenesisAndCanonicalState fetches both the genesis timestamp as well
// as the latest canonical crystallized state from a beacon node. This allows
// the validator to do the following:
//
// (1) determine if it should act as an attester/proposer and at what slot
// and what shard
//
// (2) determine the seconds since genesis by using the latest crystallized
// state recalc, then determine how many seconds have passed between that time
// and the current system time.
//
// From this, the validator client can deduce what slot interval the beacon
// node is in and determine when exactly it is time to propose or attest.
func (s *Service) fetchGenesisAndCanonicalState(client pb.BeaconServiceClient) {
res, err := client.GenesisTimeAndCanonicalState(s.ctx, &empty.Empty{})
if err != nil {
log.Errorf("Could not setup beacon chain block streaming client: %v", err)
return
// If this RPC request fails, the entire system should fatal as it is critical for
// the validator to begin this way.
log.Fatalf("could not fetch genesis time and latest canonical state from beacon node: %v", err)
}
// Determine what slot the beacon node is in by checking the number of seconds
// since the genesis block.
genesisTimestamp, err := ptypes.Timestamp(res.GetGenesisTimestamp())
if err != nil {
log.Fatalf("cannot compute genesis timestamp: %v", err)
}
s.genesisTimestamp = genesisTimestamp
crystallized := res.GetLatestCrystallizedState()
if err := s.processCrystallizedState(crystallized); err != nil {
log.Fatalf("unable to process received crystallized state: %v", err)
}
}
// waitForAssignment kicks off once the validator determines the currentSlot of the
// beacon node by calculating the difference between the current system time
// and the genesis timestamp. It runs exactly every SLOT_LENGTH seconds
// and checks if it is time for the validator to act as a proposer or attester.
func (s *Service) waitForAssignment(ticker <-chan time.Time, client pb.BeaconServiceClient) {
for {
block, err := stream.Recv()
// If the stream is closed, we stop the loop.
if err == io.EOF {
break
}
if err != nil {
log.Errorf("Could not receive latest beacon block from stream: %v", err)
continue
}
log.WithField("slotNumber", block.GetSlotNumber()).Info("Latest beacon block slot number")
// Based on the slot determined from the latest crystallized state, check if
// it matches the latest received beacon slot.
if s.responsibility == "proposer" {
log.WithField("slotNumber", block.GetSlotNumber()).Info("Assigned proposal slot number reached")
s.responsibility = ""
s.proposerAssignmentFeed.Send(block)
} else if s.responsibility == "attester" && block.GetSlotNumber() == s.assignedSlot {
// TODO: Let the validator know a few slots in advance if its attestation slot is coming up
log.Info("Assigned attestation slot number reached")
s.responsibility = ""
s.attesterAssignmentFeed.Send(block)
select {
case <-s.ctx.Done():
return
case <-ticker:
log.WithField("slotNumber", s.CurrentBeaconSlot()).Info("New beacon node slot interval")
if s.responsibility == "proposer" && s.assignedSlot == s.CurrentBeaconSlot() {
log.WithField("slotNumber", s.CurrentBeaconSlot()).Info("Assigned proposal slot number reached")
s.responsibility = ""
block, err := client.CanonicalHead(s.ctx, &empty.Empty{})
if err != nil {
log.Errorf("Could not fetch canonical head via gRPC from beacon node: %v", err)
continue
}
// We forward the latest canonical block to the proposer service via a feed.
s.proposerAssignmentFeed.Send(block)
} else if s.responsibility == "attester" && s.assignedSlot == s.CurrentBeaconSlot() {
log.Info("Assigned attestation slot number reached")
s.responsibility = ""
block, err := client.CanonicalHead(s.ctx, &empty.Empty{})
if err != nil {
log.Errorf("Could not fetch canonical head via gRPC from beacon node: %v", err)
continue
}
// We forward the latest canonical block to the attester service a feed.
s.attesterAssignmentFeed.Send(block)
}
}
}
}
func (s *Service) fetchCrystallizedState(client pb.BeaconServiceClient) {
var activeValidatorIndices []int
// listenForCrystallizedStates receives the latest canonical crystallized state
// from the beacon node's RPC server via gRPC streams.
// TODO(#545): Rename to listen for assignment instead, which is streamed from a beacon node
// upon every new cycle transition and will include the validator's index in the
// assignment bitfield as well as the assigned shard ID.
func (s *Service) listenForCrystallizedStates(client pb.BeaconServiceClient) {
stream, err := client.LatestCrystallizedState(s.ctx, &empty.Empty{})
if err != nil {
log.Errorf("Could not setup crystallized beacon state streaming client: %v", err)
@@ -122,7 +178,6 @@ func (s *Service) fetchCrystallizedState(client pb.BeaconServiceClient) {
}
for {
crystallizedState, err := stream.Recv()
// If the stream is closed, we stop the loop.
if err == io.EOF {
break
@@ -131,88 +186,55 @@ func (s *Service) fetchCrystallizedState(client pb.BeaconServiceClient) {
log.Errorf("Could not receive latest crystallized beacon state from stream: %v", err)
continue
}
// After receiving the crystallized state, get its hash, and
// this attester's index in the list.
stateData, err := proto.Marshal(crystallizedState)
if err != nil {
log.Errorf("Could not marshal crystallized state proto: %v", err)
continue
if err := s.processCrystallizedState(crystallizedState); err != nil {
log.Error(err)
}
var crystallizedStateHash [32]byte
h := blake2b.Sum512(stateData)
copy(crystallizedStateHash[:], h[:32])
dynasty := crystallizedState.GetCurrentDynasty()
for i, validator := range crystallizedState.GetValidators() {
if validator.StartDynasty <= dynasty && dynasty < validator.EndDynasty {
activeValidatorIndices = append(activeValidatorIndices, i)
}
}
isValidatorIndexSet := false
for _, val := range activeValidatorIndices {
// TODO: Check the public key instead of withdrawal address. This will use BLS.
if isZeroAddress(crystallizedState.Validators[val].WithdrawalAddress) {
s.validatorIndex = val
isValidatorIndexSet = true
break
}
}
// If validator was not found in the validator set was not set, keep listening for
// crystallized states.
if !isValidatorIndexSet {
log.Debug("Validator index not found in latest crystallized state's active validator list")
continue
}
req := &pb.ShuffleRequest{
CrystallizedStateHash: crystallizedStateHash[:],
}
res, err := client.FetchShuffledValidatorIndices(s.ctx, req)
if err != nil {
log.Errorf("Could not fetch shuffled validator indices: %v", err)
continue
}
shuffledIndices := res.GetShuffledValidatorIndices()
if uint64(s.validatorIndex) == shuffledIndices[len(shuffledIndices)-1] {
// The validator needs to propose the next block.
s.responsibility = "proposer"
log.Debug("Validator selected as proposer of the next slot")
continue
}
// If the condition above did not pass, the validator is an attester.
s.responsibility = "attester"
// Based on the cutoff and assigned slots, determine the beacon block
// slot at which attester has to perform its responsibility.
currentAssignedSlots := res.GetAssignedAttestationSlots()
currentCutoffs := res.GetCutoffIndices()
// The algorithm functions as follows:
// Given a list of slots: [0 19 38 57 12 31 50] and
// A list of cutoff indices: [0 142 285 428 571 714 857 1000]
// if the validator index is between 0-142, it can attest at slot 0, if it is
// between 142-285, that validator can attest at slot 19, etc.
slotIndex := 0
for i := 0; i < len(currentCutoffs)-1; i++ {
lowCutoff := currentCutoffs[i]
highCutoff := currentCutoffs[i+1]
if (uint64(s.validatorIndex) >= lowCutoff) && (uint64(s.validatorIndex) <= highCutoff) {
break
}
slotIndex++
}
s.assignedSlot = currentAssignedSlots[slotIndex]
log.Debug("Validator selected as attester at slot number: %d", s.assignedSlot)
}
}
// fetchProcessedAttestations fetches processed attestations from the beacon node.
func (s *Service) fetchProcessedAttestations(client pb.BeaconServiceClient) {
// processCrystallizedState uses a received crystallized state to determine
// whether a validator is a proposer/attester and the validator's assigned slot.
func (s *Service) processCrystallizedState(crystallizedState *pbp2p.CrystallizedState) error {
var activeValidatorIndices []int
dynasty := crystallizedState.GetCurrentDynasty()
for i, validator := range crystallizedState.GetValidators() {
if validator.StartDynasty <= dynasty && dynasty < validator.EndDynasty {
activeValidatorIndices = append(activeValidatorIndices, i)
}
}
isValidatorIndexSet := false
// We then iteratate over the activeValidatorIndices to determine what index
// this running validator client corresponds to.
for _, val := range activeValidatorIndices {
// TODO(#258): Check the public key instead of withdrawal address. This will use BLS.
if isZeroAddress(crystallizedState.Validators[val].WithdrawalAddress) {
s.validatorIndex = val
isValidatorIndexSet = true
break
}
}
// If validator was not found in the validator set was not set, keep listening for
// crystallized states.
if !isValidatorIndexSet {
log.Debug("Validator index not found in latest crystallized state's active validator list")
return nil
}
// The validator needs to propose the next block.
// TODO(#545): Determine this from a gRPC stream from the beacon node
// instead.
s.responsibility = "proposer"
s.assignedSlot = s.CurrentBeaconSlot() + 2
log.WithField("assignedSlot", s.assignedSlot).Info("Validator selected as proposer")
return nil
}
// listenForProcessedAttestations receives processed attestations from the
// the beacon node's RPC server via gRPC streams.
func (s *Service) listenForProcessedAttestations(client pb.BeaconServiceClient) {
stream, err := client.LatestAttestation(s.ctx, &empty.Empty{})
if err != nil {
log.Errorf("Could not setup beacon chain attestation streaming client: %v", err)
@@ -234,6 +256,24 @@ func (s *Service) fetchProcessedAttestations(client pb.BeaconServiceClient) {
}
}
// AttesterAssignmentFeed returns a feed that is written to whenever it is the validator's
// slot to perform attestations.
func (s *Service) AttesterAssignmentFeed() *event.Feed {
return s.attesterAssignmentFeed
}
// ProposerAssignmentFeed returns a feed that is written to whenever it is the validator's
// slot to proposer blocks.
func (s *Service) ProposerAssignmentFeed() *event.Feed {
return s.proposerAssignmentFeed
}
// ProcessedAttestationFeed returns a feed that is written to whenever an attestation
// is processed by a beacon node.
func (s *Service) ProcessedAttestationFeed() *event.Feed {
return s.processedAttestationFeed
}
// isZeroAddress compares a withdrawal address to an empty byte array.
func isZeroAddress(withdrawalAddress []byte) bool {
return bytes.Equal(withdrawalAddress, []byte{})

View File

@@ -9,6 +9,7 @@ import (
"time"
gomock "github.com/golang/mock/gomock"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
@@ -30,25 +31,60 @@ type mockClient struct {
func (fc *mockClient) BeaconServiceClient() pb.BeaconServiceClient {
mockServiceClient := internal.NewMockBeaconServiceClient(fc.ctrl)
blockStream := internal.NewMockBeaconService_LatestBeaconBlockClient(fc.ctrl)
blockStream.EXPECT().Recv().Return(&pbp2p.BeaconBlock{}, io.EOF)
stateStream := internal.NewMockBeaconService_LatestCrystallizedStateClient(fc.ctrl)
stateStream.EXPECT().Recv().Return(&pbp2p.CrystallizedState{}, io.EOF)
attesterStream := internal.NewMockBeaconService_LatestAttestationClient(fc.ctrl)
attesterStream.EXPECT().Recv().Return(&pbp2p.AggregatedAttestation{}, io.EOF)
mockServiceClient.EXPECT().LatestBeaconBlock(
gomock.Any(),
&empty.Empty{},
).Return(blockStream, nil)
mockServiceClient.EXPECT().LatestCrystallizedState(
gomock.Any(),
&empty.Empty{},
).Return(stateStream, nil)
mockServiceClient.EXPECT().LatestAttestation(
gomock.Any(),
&empty.Empty{},
).Return(attesterStream, nil)
mockServiceClient.EXPECT().LatestCrystallizedState(
gomock.Any(),
&empty.Empty{},
).Return(stateStream, nil)
return mockServiceClient
}
type mockLifecycleClient struct {
ctrl *gomock.Controller
}
func (fc *mockLifecycleClient) BeaconServiceClient() pb.BeaconServiceClient {
mockServiceClient := internal.NewMockBeaconServiceClient(fc.ctrl)
stateStream := internal.NewMockBeaconService_LatestCrystallizedStateClient(fc.ctrl)
stateStream.EXPECT().Recv().Return(&pbp2p.CrystallizedState{}, io.EOF)
mockServiceClient.EXPECT().LatestCrystallizedState(
gomock.Any(),
&empty.Empty{},
).Return(stateStream, nil)
validator1 := &pbp2p.ValidatorRecord{WithdrawalAddress: []byte("0x0"), StartDynasty: 1, EndDynasty: 10}
validator2 := &pbp2p.ValidatorRecord{WithdrawalAddress: []byte("0x1"), StartDynasty: 1, EndDynasty: 10}
validator3 := &pbp2p.ValidatorRecord{WithdrawalAddress: []byte{}, StartDynasty: 1, EndDynasty: 10}
crystallized := &pbp2p.CrystallizedState{
Validators: []*pbp2p.ValidatorRecord{validator1, validator2, validator3},
CurrentDynasty: 5,
}
mockServiceClient.EXPECT().GenesisTimeAndCanonicalState(
gomock.Any(),
gomock.Any(),
).Return(&pb.GenesisTimeAndStateResponse{
LatestCrystallizedState: crystallized,
GenesisTimestamp: ptypes.TimestampNow(),
}, nil)
attesterStream := internal.NewMockBeaconService_LatestAttestationClient(fc.ctrl)
mockServiceClient.EXPECT().LatestAttestation(
gomock.Any(),
&empty.Empty{},
).Return(attesterStream, nil)
attesterStream.EXPECT().Recv().Return(&pbp2p.AggregatedAttestation{}, io.EOF)
return mockServiceClient
}
@@ -56,7 +92,7 @@ func TestLifecycle(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
b := NewBeaconValidator(context.Background(), &mockClient{ctrl})
b := NewBeaconValidator(context.Background(), &mockLifecycleClient{ctrl})
// Testing basic feeds.
if b.AttesterAssignmentFeed() == nil {
t.Error("AttesterAssignmentFeed empty")
@@ -76,79 +112,133 @@ func TestLifecycle(t *testing.T) {
testutil.AssertLogsContain(t, hook, "Stopping service")
}
func TestFetchBeaconBlocks(t *testing.T) {
func TestCurrentBeaconSlot(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
b := NewBeaconValidator(context.Background(), &mockLifecycleClient{ctrl})
b.genesisTimestamp = time.Now()
if b.CurrentBeaconSlot() != 0 {
t.Errorf("Expected us to be in the 0th slot, received %v", b.CurrentBeaconSlot())
}
}
func TestWaitForAssignmentProposer(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
b := NewBeaconValidator(context.Background(), &mockClient{ctrl})
// Create mock for the stream returned by LatestBeaconBlock.
stream := internal.NewMockBeaconService_LatestBeaconBlockClient(ctrl)
// If the block's slot number from the stream matches the assigned attestation slot,
// trigger a log.
stream.EXPECT().Recv().Return(&pbp2p.BeaconBlock{SlotNumber: 10}, nil)
stream.EXPECT().Recv().Return(&pbp2p.BeaconBlock{}, io.EOF)
b.assignedSlot = 10
b.responsibility = "attester"
mockServiceClient := internal.NewMockBeaconServiceClient(ctrl)
mockServiceClient.EXPECT().LatestBeaconBlock(
mockServiceClient.EXPECT().CanonicalHead(
gomock.Any(),
gomock.Any(),
).Return(stream, nil)
).Return(nil, nil)
b.fetchBeaconBlocks(mockServiceClient)
exitRoutine := make(chan bool)
timeChan := make(chan time.Time)
go func() {
b.waitForAssignment(timeChan, mockServiceClient)
<-exitRoutine
}()
testutil.AssertLogsContain(t, hook, "Latest beacon block slot number")
testutil.AssertLogsContain(t, hook, "Assigned attestation slot number reached")
// If the validator is assigned to be a proposer, trigger a log upon next
// SlotNumber being reached.
stream = internal.NewMockBeaconService_LatestBeaconBlockClient(ctrl)
stream.EXPECT().Recv().Return(&pbp2p.BeaconBlock{SlotNumber: 1}, nil)
stream.EXPECT().Recv().Return(&pbp2p.BeaconBlock{}, io.EOF)
b.responsibility = "proposer"
b.genesisTimestamp = time.Now()
b.assignedSlot = 0
timeChan <- time.Now()
b.cancel()
exitRoutine <- true
mockServiceClient = internal.NewMockBeaconServiceClient(ctrl)
mockServiceClient.EXPECT().LatestBeaconBlock(
gomock.Any(),
gomock.Any(),
).Return(stream, nil)
b.fetchBeaconBlocks(mockServiceClient)
testutil.AssertLogsContain(t, hook, "Latest beacon block slot number")
testutil.AssertLogsContain(t, hook, "Assigned proposal slot number reached")
// Testing an error coming from the stream.
stream = internal.NewMockBeaconService_LatestBeaconBlockClient(ctrl)
stream.EXPECT().Recv().Return(&pbp2p.BeaconBlock{}, errors.New("stream error"))
stream.EXPECT().Recv().Return(&pbp2p.BeaconBlock{}, io.EOF)
mockServiceClient = internal.NewMockBeaconServiceClient(ctrl)
mockServiceClient.EXPECT().LatestBeaconBlock(
gomock.Any(),
gomock.Any(),
).Return(stream, nil)
b.fetchBeaconBlocks(mockServiceClient)
testutil.AssertLogsContain(t, hook, "stream error")
// Creating a faulty stream will trigger error.
mockServiceClient = internal.NewMockBeaconServiceClient(ctrl)
mockServiceClient.EXPECT().LatestBeaconBlock(
gomock.Any(),
gomock.Any(),
).Return(stream, errors.New("stream creation failed"))
b.fetchBeaconBlocks(mockServiceClient)
testutil.AssertLogsContain(t, hook, "stream creation failed")
testutil.AssertLogsContain(t, hook, "New beacon node slot interval")
}
func TestFetchCrystallizedState(t *testing.T) {
func TestWaitForAssignmentProposerError(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
b := NewBeaconValidator(context.Background(), &mockClient{ctrl})
mockServiceClient := internal.NewMockBeaconServiceClient(ctrl)
mockServiceClient.EXPECT().CanonicalHead(
gomock.Any(),
gomock.Any(),
).Return(nil, errors.New("failed"))
exitRoutine := make(chan bool)
timeChan := make(chan time.Time)
go func() {
b.waitForAssignment(timeChan, mockServiceClient)
<-exitRoutine
}()
b.responsibility = "proposer"
b.genesisTimestamp = time.Now()
b.assignedSlot = 0
timeChan <- time.Now()
b.cancel()
exitRoutine <- true
testutil.AssertLogsContain(t, hook, "failed")
}
func TestWaitForAssignmentAttester(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
b := NewBeaconValidator(context.Background(), &mockClient{ctrl})
mockServiceClient := internal.NewMockBeaconServiceClient(ctrl)
mockServiceClient.EXPECT().CanonicalHead(
gomock.Any(),
gomock.Any(),
).Return(nil, nil)
exitRoutine := make(chan bool)
timeChan := make(chan time.Time)
go func() {
b.waitForAssignment(timeChan, mockServiceClient)
<-exitRoutine
}()
b.responsibility = "attester"
b.genesisTimestamp = time.Now()
b.assignedSlot = 0
timeChan <- time.Now()
b.cancel()
exitRoutine <- true
testutil.AssertLogsContain(t, hook, "New beacon node slot interval")
}
func TestWaitForAssignmentAttesterError(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
b := NewBeaconValidator(context.Background(), &mockClient{ctrl})
mockServiceClient := internal.NewMockBeaconServiceClient(ctrl)
mockServiceClient.EXPECT().CanonicalHead(
gomock.Any(),
gomock.Any(),
).Return(nil, errors.New("failed"))
exitRoutine := make(chan bool)
timeChan := make(chan time.Time)
go func() {
b.waitForAssignment(timeChan, mockServiceClient)
<-exitRoutine
}()
b.responsibility = "attester"
b.genesisTimestamp = time.Now()
b.assignedSlot = 0
timeChan <- time.Now()
b.cancel()
exitRoutine <- true
testutil.AssertLogsContain(t, hook, "failed")
}
func TestListenForCrystallizedStates(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@@ -162,7 +252,7 @@ func TestFetchCrystallizedState(t *testing.T) {
gomock.Any(),
).Return(stream, errors.New("stream creation failed"))
b.fetchCrystallizedState(mockServiceClient)
b.listenForCrystallizedStates(mockServiceClient)
testutil.AssertLogsContain(t, hook, "stream creation failed")
@@ -177,25 +267,10 @@ func TestFetchCrystallizedState(t *testing.T) {
gomock.Any(),
).Return(stream, nil)
b.fetchCrystallizedState(mockServiceClient)
b.listenForCrystallizedStates(mockServiceClient)
testutil.AssertLogsContain(t, hook, "recv error")
// Being unable to marshal the received crystallized state should log an error.
stream = internal.NewMockBeaconService_LatestCrystallizedStateClient(ctrl)
stream.EXPECT().Recv().Return(nil, nil)
stream.EXPECT().Recv().Return(&pbp2p.CrystallizedState{}, io.EOF)
mockServiceClient = internal.NewMockBeaconServiceClient(ctrl)
mockServiceClient.EXPECT().LatestCrystallizedState(
gomock.Any(),
gomock.Any(),
).Return(stream, nil)
b.fetchCrystallizedState(mockServiceClient)
testutil.AssertLogsContain(t, hook, "Could not marshal crystallized state proto")
// If the current validator is not found within the active validators list, log a debug message.
validator := &pbp2p.ValidatorRecord{WithdrawalAddress: []byte("0x01"), StartDynasty: 1, EndDynasty: 10}
stream = internal.NewMockBeaconService_LatestCrystallizedStateClient(ctrl)
@@ -208,31 +283,12 @@ func TestFetchCrystallizedState(t *testing.T) {
gomock.Any(),
).Return(stream, nil)
b.fetchCrystallizedState(mockServiceClient)
b.listenForCrystallizedStates(mockServiceClient)
testutil.AssertLogsContain(t, hook, "Validator index not found in latest crystallized state's active validator list")
// A faulty client.ShuffleValidators should log error.
validator = &pbp2p.ValidatorRecord{WithdrawalAddress: []byte{}, StartDynasty: 1, EndDynasty: 10}
stream = internal.NewMockBeaconService_LatestCrystallizedStateClient(ctrl)
stream.EXPECT().Recv().Return(&pbp2p.CrystallizedState{Validators: []*pbp2p.ValidatorRecord{validator}, CurrentDynasty: 5}, nil)
stream.EXPECT().Recv().Return(&pbp2p.CrystallizedState{}, io.EOF)
mockServiceClient = internal.NewMockBeaconServiceClient(ctrl)
mockServiceClient.EXPECT().LatestCrystallizedState(
gomock.Any(),
gomock.Any(),
).Return(stream, nil)
mockServiceClient.EXPECT().FetchShuffledValidatorIndices(
gomock.Any(),
gomock.Any(),
).Return(nil, errors.New("something went wrong"))
b.fetchCrystallizedState(mockServiceClient)
testutil.AssertLogsContain(t, hook, "Could not fetch shuffled validator indices: something went wrong")
// Slot should be assigned based on the result of ShuffleValidators.
// If the validator is the last index in the shuffled validator indices, it should be assigned
// to be a proposer.
validator1 := &pbp2p.ValidatorRecord{WithdrawalAddress: []byte("0x0"), StartDynasty: 1, EndDynasty: 10}
validator2 := &pbp2p.ValidatorRecord{WithdrawalAddress: []byte("0x1"), StartDynasty: 1, EndDynasty: 10}
validator3 := &pbp2p.ValidatorRecord{WithdrawalAddress: []byte{}, StartDynasty: 1, EndDynasty: 10}
@@ -245,46 +301,13 @@ func TestFetchCrystallizedState(t *testing.T) {
gomock.Any(),
gomock.Any(),
).Return(stream, nil)
mockServiceClient.EXPECT().FetchShuffledValidatorIndices(
gomock.Any(),
gomock.Any(),
).Return(&pb.ShuffleResponse{
AssignedAttestationSlots: []uint64{0, 1, 2},
CutoffIndices: []uint64{0, 1, 2},
ShuffledValidatorIndices: []uint64{2, 1, 0},
}, nil)
b.fetchCrystallizedState(mockServiceClient)
b.listenForCrystallizedStates(mockServiceClient)
testutil.AssertLogsContain(t, hook, "Validator selected as attester")
// If the validator is the last index in the shuffled validator indices, it should be assigned
// to be a proposer.
validator1 = &pbp2p.ValidatorRecord{WithdrawalAddress: []byte("0x0"), StartDynasty: 1, EndDynasty: 10}
validator2 = &pbp2p.ValidatorRecord{WithdrawalAddress: []byte("0x1"), StartDynasty: 1, EndDynasty: 10}
validator3 = &pbp2p.ValidatorRecord{WithdrawalAddress: []byte{}, StartDynasty: 1, EndDynasty: 10}
stream = internal.NewMockBeaconService_LatestCrystallizedStateClient(ctrl)
stream.EXPECT().Recv().Return(&pbp2p.CrystallizedState{Validators: []*pbp2p.ValidatorRecord{validator1, validator2, validator3}, CurrentDynasty: 5}, nil)
stream.EXPECT().Recv().Return(&pbp2p.CrystallizedState{}, io.EOF)
mockServiceClient = internal.NewMockBeaconServiceClient(ctrl)
mockServiceClient.EXPECT().LatestCrystallizedState(
gomock.Any(),
gomock.Any(),
).Return(stream, nil)
mockServiceClient.EXPECT().FetchShuffledValidatorIndices(
gomock.Any(),
gomock.Any(),
).Return(&pb.ShuffleResponse{
ShuffledValidatorIndices: []uint64{0, 1, 2},
}, nil)
b.fetchCrystallizedState(mockServiceClient)
testutil.AssertLogsContain(t, hook, "Validator selected as proposer of the next slot")
testutil.AssertLogsContain(t, hook, "Validator selected as proposer")
}
func TestFetchProcessedAttestations(t *testing.T) {
func TestListenForProcessedAttestations(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@@ -303,7 +326,7 @@ func TestFetchProcessedAttestations(t *testing.T) {
gomock.Any(),
).Return(stream, nil)
b.fetchProcessedAttestations(mockServiceClient)
b.listenForProcessedAttestations(mockServiceClient)
testutil.AssertLogsContain(t, hook, "Latest attestation slot number")
@@ -318,7 +341,7 @@ func TestFetchProcessedAttestations(t *testing.T) {
gomock.Any(),
).Return(stream, nil)
b.fetchProcessedAttestations(mockServiceClient)
b.listenForProcessedAttestations(mockServiceClient)
testutil.AssertLogsContain(t, hook, "stream error")
@@ -329,7 +352,7 @@ func TestFetchProcessedAttestations(t *testing.T) {
gomock.Any(),
).Return(stream, errors.New("stream creation failed"))
b.fetchProcessedAttestations(mockServiceClient)
b.listenForProcessedAttestations(mockServiceClient)
testutil.AssertLogsContain(t, hook, "stream creation failed")
testutil.AssertLogsContain(t, hook, "Could not receive latest attestation from stream")
}

View File

@@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1 (interfaces: BeaconServiceClient,BeaconService_LatestBeaconBlockClient,BeaconService_LatestCrystallizedStateClient,BeaconService_LatestAttestationClient)
// Source: github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1 (interfaces: BeaconServiceClient,BeaconService_LatestCrystallizedStateClient,BeaconService_LatestAttestationClient)
package internal
@@ -38,22 +38,40 @@ func (m *MockBeaconServiceClient) EXPECT() *MockBeaconServiceClientMockRecorder
return m.recorder
}
// FetchShuffledValidatorIndices mocks base method
func (m *MockBeaconServiceClient) FetchShuffledValidatorIndices(arg0 context.Context, arg1 *v10.ShuffleRequest, arg2 ...grpc.CallOption) (*v10.ShuffleResponse, error) {
// CanonicalHead mocks base method
func (m *MockBeaconServiceClient) CanonicalHead(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (*v1.BeaconBlock, error) {
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "FetchShuffledValidatorIndices", varargs...)
ret0, _ := ret[0].(*v10.ShuffleResponse)
ret := m.ctrl.Call(m, "CanonicalHead", varargs...)
ret0, _ := ret[0].(*v1.BeaconBlock)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchShuffledValidatorIndices indicates an expected call of FetchShuffledValidatorIndices
func (mr *MockBeaconServiceClientMockRecorder) FetchShuffledValidatorIndices(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
// CanonicalHead indicates an expected call of CanonicalHead
func (mr *MockBeaconServiceClientMockRecorder) CanonicalHead(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchShuffledValidatorIndices", reflect.TypeOf((*MockBeaconServiceClient)(nil).FetchShuffledValidatorIndices), varargs...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CanonicalHead", reflect.TypeOf((*MockBeaconServiceClient)(nil).CanonicalHead), varargs...)
}
// GenesisTimeAndCanonicalState mocks base method
func (m *MockBeaconServiceClient) GenesisTimeAndCanonicalState(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (*v10.GenesisTimeAndStateResponse, error) {
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "GenesisTimeAndCanonicalState", varargs...)
ret0, _ := ret[0].(*v10.GenesisTimeAndStateResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GenesisTimeAndCanonicalState indicates an expected call of GenesisTimeAndCanonicalState
func (mr *MockBeaconServiceClientMockRecorder) GenesisTimeAndCanonicalState(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenesisTimeAndCanonicalState", reflect.TypeOf((*MockBeaconServiceClient)(nil).GenesisTimeAndCanonicalState), varargs...)
}
// LatestAttestation mocks base method
@@ -74,24 +92,6 @@ func (mr *MockBeaconServiceClientMockRecorder) LatestAttestation(arg0, arg1 inte
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatestAttestation", reflect.TypeOf((*MockBeaconServiceClient)(nil).LatestAttestation), varargs...)
}
// LatestBeaconBlock mocks base method
func (m *MockBeaconServiceClient) LatestBeaconBlock(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (v10.BeaconService_LatestBeaconBlockClient, error) {
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "LatestBeaconBlock", varargs...)
ret0, _ := ret[0].(v10.BeaconService_LatestBeaconBlockClient)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// LatestBeaconBlock indicates an expected call of LatestBeaconBlock
func (mr *MockBeaconServiceClientMockRecorder) LatestBeaconBlock(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatestBeaconBlock", reflect.TypeOf((*MockBeaconServiceClient)(nil).LatestBeaconBlock), varargs...)
}
// LatestCrystallizedState mocks base method
func (m *MockBeaconServiceClient) LatestCrystallizedState(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (v10.BeaconService_LatestCrystallizedStateClient, error) {
varargs := []interface{}{arg0, arg1}
@@ -110,115 +110,6 @@ func (mr *MockBeaconServiceClientMockRecorder) LatestCrystallizedState(arg0, arg
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatestCrystallizedState", reflect.TypeOf((*MockBeaconServiceClient)(nil).LatestCrystallizedState), varargs...)
}
// MockBeaconService_LatestBeaconBlockClient is a mock of BeaconService_LatestBeaconBlockClient interface
type MockBeaconService_LatestBeaconBlockClient struct {
ctrl *gomock.Controller
recorder *MockBeaconService_LatestBeaconBlockClientMockRecorder
}
// MockBeaconService_LatestBeaconBlockClientMockRecorder is the mock recorder for MockBeaconService_LatestBeaconBlockClient
type MockBeaconService_LatestBeaconBlockClientMockRecorder struct {
mock *MockBeaconService_LatestBeaconBlockClient
}
// NewMockBeaconService_LatestBeaconBlockClient creates a new mock instance
func NewMockBeaconService_LatestBeaconBlockClient(ctrl *gomock.Controller) *MockBeaconService_LatestBeaconBlockClient {
mock := &MockBeaconService_LatestBeaconBlockClient{ctrl: ctrl}
mock.recorder = &MockBeaconService_LatestBeaconBlockClientMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockBeaconService_LatestBeaconBlockClient) EXPECT() *MockBeaconService_LatestBeaconBlockClientMockRecorder {
return m.recorder
}
// CloseSend mocks base method
func (m *MockBeaconService_LatestBeaconBlockClient) CloseSend() error {
ret := m.ctrl.Call(m, "CloseSend")
ret0, _ := ret[0].(error)
return ret0
}
// CloseSend indicates an expected call of CloseSend
func (mr *MockBeaconService_LatestBeaconBlockClientMockRecorder) CloseSend() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockBeaconService_LatestBeaconBlockClient)(nil).CloseSend))
}
// Context mocks base method
func (m *MockBeaconService_LatestBeaconBlockClient) Context() context.Context {
ret := m.ctrl.Call(m, "Context")
ret0, _ := ret[0].(context.Context)
return ret0
}
// Context indicates an expected call of Context
func (mr *MockBeaconService_LatestBeaconBlockClientMockRecorder) Context() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockBeaconService_LatestBeaconBlockClient)(nil).Context))
}
// Header mocks base method
func (m *MockBeaconService_LatestBeaconBlockClient) Header() (metadata.MD, error) {
ret := m.ctrl.Call(m, "Header")
ret0, _ := ret[0].(metadata.MD)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Header indicates an expected call of Header
func (mr *MockBeaconService_LatestBeaconBlockClientMockRecorder) Header() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockBeaconService_LatestBeaconBlockClient)(nil).Header))
}
// Recv mocks base method
func (m *MockBeaconService_LatestBeaconBlockClient) Recv() (*v1.BeaconBlock, error) {
ret := m.ctrl.Call(m, "Recv")
ret0, _ := ret[0].(*v1.BeaconBlock)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Recv indicates an expected call of Recv
func (mr *MockBeaconService_LatestBeaconBlockClientMockRecorder) Recv() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockBeaconService_LatestBeaconBlockClient)(nil).Recv))
}
// RecvMsg mocks base method
func (m *MockBeaconService_LatestBeaconBlockClient) RecvMsg(arg0 interface{}) error {
ret := m.ctrl.Call(m, "RecvMsg", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// RecvMsg indicates an expected call of RecvMsg
func (mr *MockBeaconService_LatestBeaconBlockClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockBeaconService_LatestBeaconBlockClient)(nil).RecvMsg), arg0)
}
// SendMsg mocks base method
func (m *MockBeaconService_LatestBeaconBlockClient) SendMsg(arg0 interface{}) error {
ret := m.ctrl.Call(m, "SendMsg", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// SendMsg indicates an expected call of SendMsg
func (mr *MockBeaconService_LatestBeaconBlockClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockBeaconService_LatestBeaconBlockClient)(nil).SendMsg), arg0)
}
// Trailer mocks base method
func (m *MockBeaconService_LatestBeaconBlockClient) Trailer() metadata.MD {
ret := m.ctrl.Call(m, "Trailer")
ret0, _ := ret[0].(metadata.MD)
return ret0
}
// Trailer indicates an expected call of Trailer
func (mr *MockBeaconService_LatestBeaconBlockClientMockRecorder) Trailer() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockBeaconService_LatestBeaconBlockClient)(nil).Trailer))
}
// MockBeaconService_LatestCrystallizedStateClient is a mock of BeaconService_LatestCrystallizedStateClient interface
type MockBeaconService_LatestCrystallizedStateClient struct {
ctrl *gomock.Controller

View File

@@ -11,6 +11,7 @@ import (
func DefaultConfig() *Config {
return &Config{
CollationSizeLimit: DefaultCollationSizeLimit(),
SlotDuration: 8,
}
}
@@ -23,4 +24,5 @@ func DefaultCollationSizeLimit() int64 {
// Config contains configs for node to participate in the sharded universe.
type Config struct {
CollationSizeLimit int64 // CollationSizeLimit is the maximum size the serialized blobs in a collation can take.
SlotDuration int // SlotDuration in seconds.
}

View File

@@ -42,7 +42,7 @@ type Proposer struct {
attestationService rpcAttestationService
attestationChan chan *pbp2p.AggregatedAttestation
pendingAttestation []*pbp2p.AggregatedAttestation
mutex *sync.Mutex
lock sync.Mutex
}
// Config options for proposer service.
@@ -66,7 +66,7 @@ func NewProposer(ctx context.Context, cfg *Config) *Proposer {
assignmentChan: make(chan *pbp2p.BeaconBlock, cfg.AssignmentBuf),
attestationChan: make(chan *pbp2p.AggregatedAttestation, cfg.AttestationBufferSize),
pendingAttestation: make([]*pbp2p.AggregatedAttestation, 0),
mutex: &sync.Mutex{},
lock: sync.Mutex{},
}
}
@@ -130,7 +130,6 @@ func (p *Proposer) processAttestation(done <-chan struct{}) {
log.Debug("Proposer context closed, exiting goroutine")
return
case attestationRecord := <-p.attestationChan:
attestationExists := p.DoesAttestationExist(attestationRecord)
if !attestationExists {
p.AddPendingAttestation(attestationRecord)
@@ -151,10 +150,6 @@ func (p *Proposer) run(done <-chan struct{}, client pb.ProposerServiceClient) {
case <-done:
log.Debug("Proposer context closed, exiting goroutine")
return
// TODO: On the beacon node side, calculate active and crystallized and update the
// active/crystallize state hash values in the proposed block.
// When we receive an assignment on a slot, we leverage the fields
// from the latest canonical beacon block to perform a proposal responsibility.
case latestBeaconBlock := <-p.assignmentChan:
@@ -170,21 +165,21 @@ func (p *Proposer) run(done <-chan struct{}, client pb.ProposerServiceClient) {
latestBlockHash := blake2b.Sum512(data)
// To prevent any unaccounted attestations from being added.
p.mutex.Lock()
p.lock.Lock()
agSig := p.AggregateAllSignatures(p.pendingAttestation)
bitmask := p.GenerateBitmask(p.pendingAttestation)
// TODO: Implement real proposals with randao reveals and attestation fields.
req := &pb.ProposeRequest{
ParentHash: latestBlockHash[:],
ParentHash: latestBlockHash[:],
// TODO: Fix to be the actual, timebased slot number instead.
SlotNumber: latestBeaconBlock.GetSlotNumber() + 1,
RandaoReveal: []byte{},
AttestationBitmask: bitmask,
AttestationAggregateSig: agSig,
Timestamp: ptypes.TimestampNow(),
}
res, err := client.ProposeBlock(p.ctx, req)
if err != nil {
log.Errorf("Could not propose block: %v", err)
@@ -193,7 +188,7 @@ func (p *Proposer) run(done <-chan struct{}, client pb.ProposerServiceClient) {
log.Infof("Block proposed successfully with hash 0x%x", res.BlockHash)
p.pendingAttestation = nil
p.mutex.Unlock()
p.lock.Unlock()
}
}
}