Validator Client Use Assignment Stream to Determine Responsibility (#566)

This commit is contained in:
terence tsao
2018-09-27 14:46:07 -07:00
committed by GitHub
parent bc7e07d5dd
commit 2566687db7
13 changed files with 173 additions and 140 deletions

View File

@@ -73,12 +73,12 @@ func NewBeaconChain(genesisJSON string, db ethdb.Database) (*BeaconChain, error)
return nil, err
}
}
if !hasCrystallized {
log.Info("No chainstate found on disk, initializing beacon from genesis")
beaconChain.state.CrystallizedState = crystallized
return beaconChain, nil
}
enc, err := db.Get(crystallizedStateLookupKey)
if err != nil {
return nil, err

View File

@@ -1,6 +1,7 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1 (interfaces: BeaconServiceServer,BeaconService_LatestAttestationServer,BeaconService_ValidatorAssignmentsServer)
// Package mock_v1 is a generated GoMock package.
package internal
import (

View File

@@ -68,6 +68,7 @@ VERSION:
utils.RPCPort,
utils.CertFlag,
utils.KeyFlag,
utils.GenesisJSON,
cmd.DataDirFlag,
cmd.VerbosityFlag,
cmd.EnableTracingFlag,

View File

@@ -180,7 +180,6 @@ func (s *Service) CurrentAssignmentsAndGenesisTime(ctx context.Context, req *pb.
// #nosec G104
protoGenesis, _ := ptypes.TimestampProto(params.GenesisTime)
cState := s.chainService.CurrentCrystallizedState()
var keys []*pb.PublicKey
if req.AllValidators {
for _, val := range cState.Validators() {
@@ -248,6 +247,7 @@ func (s *Service) AttestHead(ctx context.Context, req *pb.AttestRequest) (*pb.At
// LatestAttestation streams the latest processed attestations to the rpc clients.
func (s *Service) LatestAttestation(req *empty.Empty, stream pb.BeaconService_LatestAttestationServer) error {
sub := s.attestationService.IncomingAttestationFeed().Subscribe(s.incomingAttestation)
defer sub.Unsubscribe()
for {
@@ -415,7 +415,6 @@ func assignmentsForPublicKeys(keys []*pb.PublicKey, cState *types.CrystallizedSt
if err != nil {
return nil, err
}
assignments = append(assignments, &pb.Assignment{
PublicKey: val,
ShardId: shardID,

View File

@@ -13,6 +13,7 @@ func TestRealClockIsAccurate(t *testing.T) {
if clockTime != actualTime {
t.Errorf("The time from the Clock interface should equal the actual time. Got: %v, Expected: %v", clockTime, actualTime)
}
BlockingWait(0)
CurrentBeaconSlot()
}

View File

@@ -1,14 +1,54 @@
{
"validators":[
{"publicKey":"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","balance":"32","endDynasty":"999999999999999999"},
{"publicKey":"AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEB","balance":"32","endDynasty":"999999999999999999"},
{"publicKey":"AgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgIC","balance":"32","endDynasty":"999999999999999999"},
{"publicKey":"AwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMD","balance":"32","endDynasty":"999999999999999999"},
{"publicKey":"BAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQE","balance":"32","endDynasty":"999999999999999999"},
{"publicKey":"BQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUF","balance":"32","endDynasty":"999999999999999999"},
{"publicKey":"BgYGBgYGBgYGBgYGBgYGBgYGBgYGBgYGBgYGBgYGBgYGBgYGBgYGBgYGBgYGBgYG","balance":"32","endDynasty":"999999999999999999"},
{"publicKey":"CAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgI","balance":"32","endDynasty":"999999999999999999"},
{"publicKey":"CQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJ","balance":"32","endDynasty":"999999999999999999"},
{"publicKey":"CgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoK","balance":"32","endDynasty":"999999999999999999"}
"validators": [
{
"publicKey": "QUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFB",
"balance": "32",
"endDynasty": "999999999999999999"
},
{
"publicKey": "QkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJC",
"balance": "32",
"endDynasty": "999999999999999999"
},
{
"publicKey": "Q0NDQ0NDQ0NDQ0NDQ0NDQ0NDQ0NDQ0NDQ0NDQ0NDQ0NDQ0NDQ0NDQ0NDQ0NDQ0ND",
"balance": "32",
"endDynasty": "999999999999999999"
},
{
"publicKey": "RERERERERERERERERERERERERERERERERERERERERERERERERERERERERERERERE",
"balance": "32",
"endDynasty": "999999999999999999"
},
{
"publicKey": "RUVFRUVFRUVFRUVFRUVFRUVFRUVFRUVFRUVFRUVFRUVFRUVFRUVFRUVFRUVFRUVF",
"balance": "32",
"endDynasty": "999999999999999999"
},
{
"publicKey": "RkZGRkZGRkZGRkZGRkZGRkZGRkZGRkZGRkZGRkZGRkZGRkZGRkZGRkZGRkZGRkZG",
"balance": "32",
"endDynasty": "999999999999999999"
},
{
"publicKey": "R0dHR0dHR0dHR0dHR0dHR0dHR0dHR0dHR0dHR0dHR0dHR0dHR0dHR0dHR0dHR0dH",
"balance": "32",
"endDynasty": "999999999999999999"
},
{
"publicKey": "SEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhI",
"balance": "32",
"endDynasty": "999999999999999999"
},
{
"publicKey": "SUlJSUlJSUlJSUlJSUlJSUlJSUlJSUlJSUlJSUlJSUlJSUlJSUlJSUlJSUlJSUlJ",
"balance": "32",
"endDynasty": "999999999999999999"
},
{
"publicKey": "SkpKSkpKSkpKSkpKSkpKSkpKSkpKSkpKSkpKSkpKSkpKSkpKSkpKSkpKSkpKSkpK",
"balance": "32",
"endDynasty": "999999999999999999"
}
]
}
}

View File

@@ -21,8 +21,9 @@ type rpcClientService interface {
ValidatorServiceClient() pb.ValidatorServiceClient
}
type assignmentAnnouncer interface {
type beaconClientService interface {
AttesterAssignmentFeed() *event.Feed
PublicKey() []byte
}
// Attester holds functionality required to run a block attester
@@ -30,20 +31,18 @@ type assignmentAnnouncer interface {
type Attester struct {
ctx context.Context
cancel context.CancelFunc
assigner assignmentAnnouncer
beaconService beaconClientService
rpcClientService rpcClientService
assignmentChan chan *pbp2p.BeaconBlock
shardID uint64
pubKey []byte
}
// Config options for an attester service.
type Config struct {
AssignmentBuf int
ShardID uint64
Assigner assignmentAnnouncer
Assigner beaconClientService
Client rpcClientService
Pubkey []byte
}
// NewAttester creates a new attester instance.
@@ -52,10 +51,9 @@ func NewAttester(ctx context.Context, cfg *Config) *Attester {
return &Attester{
ctx: ctx,
cancel: cancel,
assigner: cfg.Assigner,
beaconService: cfg.Assigner,
rpcClientService: cfg.Client,
shardID: cfg.ShardID,
pubKey: cfg.Pubkey,
assignmentChan: make(chan *pbp2p.BeaconBlock, cfg.AssignmentBuf),
}
}
@@ -77,7 +75,7 @@ func (a *Attester) Stop() error {
// run the main event loop that listens for an attester assignment.
func (a *Attester) run(attester pb.AttesterServiceClient, validator pb.ValidatorServiceClient) {
sub := a.assigner.AttesterAssignmentFeed().Subscribe(a.assignmentChan)
sub := a.beaconService.AttesterAssignmentFeed().Subscribe(a.assignmentChan)
defer sub.Unsubscribe()
for {
@@ -96,7 +94,7 @@ func (a *Attester) run(attester pb.AttesterServiceClient, validator pb.Validator
latestBlockHash := blake2b.Sum512(data)
pubKeyReq := &pb.PublicKey{
PublicKey: a.pubKey,
PublicKey: a.beaconService.PublicKey(),
}
shardID, err := validator.ValidatorShardID(a.ctx, pubKeyReq)
if err != nil {

View File

@@ -39,6 +39,10 @@ func (m *mockAssigner) AttesterAssignmentFeed() *event.Feed {
return new(event.Feed)
}
func (m *mockAssigner) PublicKey() []byte {
return []byte{}
}
func TestLifecycle(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)

View File

@@ -26,6 +26,7 @@ type rpcClientService interface {
type Service struct {
ctx context.Context
cancel context.CancelFunc
pubKey []byte
rpcClient rpcClientService
assignedSlot uint64
shardID uint64
@@ -39,10 +40,11 @@ type Service struct {
// NewBeaconValidator instantiates a service that interacts with a beacon node
// via gRPC requests.
func NewBeaconValidator(ctx context.Context, rpcClient rpcClientService) *Service {
func NewBeaconValidator(ctx context.Context, pubKey []byte, rpcClient rpcClientService) *Service {
ctx, cancel := context.WithCancel(ctx)
return &Service{
ctx: ctx,
pubKey: pubKey,
cancel: cancel,
rpcClient: rpcClient,
attesterAssignmentFeed: new(event.Feed),
@@ -77,17 +79,18 @@ func (s *Service) Start() {
// so the validator and beacon node's internal tickers are aligned.
utils.BlockingWait(s.slotAlignmentDuration)
// We kick off a routine that listens for stream of validator assignment coming from
// beacon node. This will update validator client on which slot, shard ID and what
// responsbility to perform.
go s.listenForAssignmentChange(beaconServiceClient)
slotTicker := time.NewTicker(s.slotAlignmentDuration)
go s.waitForAssignment(slotTicker.C, beaconServiceClient)
// We then kick off a routine that listens for streams of cycle transitions
// coming from the beacon node. This will allow the validator client to recalculate
// when it has to perform its responsibilities.
go s.listenForCycleTransitions(beaconServiceClient)
go s.listenForProcessedAttestations(beaconServiceClient)
}
// Stop the main loop..
// Stop the main loop.
func (s *Service) Stop() error {
defer s.cancel()
log.Info("Stopping service")
@@ -127,79 +130,34 @@ func (s *Service) fetchCurrentAssignmentsAndGenesisTime(client pb.BeaconServiceC
log.Fatalf("cannot compute genesis timestamp: %v", err)
}
log.Infof("Setting validator genesis time to %d", genesisTimestamp.Unix())
s.genesisTimestamp = genesisTimestamp
for _, assign := range res.Assignments {
if bytes.Equal(assign.PublicKey.PublicKey, s.pubKey) {
s.role = assign.Role
s.assignedSlot = assign.AssignedSlot
s.shardID = assign.ShardId
// Loops through the received assignments to determine which one
// corresponds to this validator client based on a matching public key.
for _, assignment := range res.GetAssignments() {
// TODO(#566): Determine assignment based on public key flag.
pubKeyProto := assignment.GetPublicKey()
if isZeroAddress(pubKeyProto.GetPublicKey()) {
s.assignedSlot = s.CurrentCycleStartSlot() + assignment.GetAssignedSlot()
s.shardID = assignment.GetShardId()
s.role = assignment.GetRole()
break
}
}
if s.role == pb.ValidatorRole_PROPOSER {
log.Infof("Assigned as PROPOSER to slot %v, shardID: %v", s.assignedSlot, s.shardID)
} else {
log.Infof("Assigned as ATTESTER to slot %v, shardID: %v", s.assignedSlot, s.shardID)
}
if s.CurrentBeaconSlot() > s.assignedSlot {
log.Info("You joined a bit too late -the current slot is greater than assigned slot in the cycle, wait until next cycle to be re-assigned")
}
}
// waitForAssignment kicks off once the validator determines the currentSlot of the
// beacon node by calculating the difference between the current system time
// and the genesis timestamp. It runs exactly every SLOT_LENGTH seconds
// and checks if it is time for the validator to act as a proposer or attester.
func (s *Service) waitForAssignment(ticker <-chan time.Time, client pb.BeaconServiceClient) {
for {
select {
case <-s.ctx.Done():
return
case <-ticker:
log.WithField("slotNumber", s.CurrentBeaconSlot()).Info("New beacon node slot")
if s.role == pb.ValidatorRole_PROPOSER && s.assignedSlot == s.CurrentBeaconSlot() {
log.WithField("slotNumber", s.CurrentBeaconSlot()).Info("Assigned proposal slot number reached")
block, err := client.CanonicalHead(s.ctx, &empty.Empty{})
if err != nil {
log.Errorf("Could not fetch canonical head via gRPC from beacon node: %v", err)
continue
}
// We forward the latest canonical block to the proposer service via a feed.
s.proposerAssignmentFeed.Send(block)
} else if s.role == pb.ValidatorRole_ATTESTER && s.assignedSlot == s.CurrentBeaconSlot() {
log.Info("Assigned attestation slot number reached")
block, err := client.CanonicalHead(s.ctx, &empty.Empty{})
if err != nil {
log.Errorf("Could not fetch canonical head via gRPC from beacon node: %v", err)
continue
}
// We forward the latest canonical block to the attester service a feed.
s.attesterAssignmentFeed.Send(block)
}
log.Infof("Validator shuffled. Pub key 0x%s re-assigned to shard ID %d for %v duty at slot %d",
string(s.pubKey),
s.assignedSlot,
s.role,
s.assignedSlot)
}
}
}
// listenForCycleTransitions receives validator assignments from the
// the beacon node's RPC server when a new cycle transition occurs.
func (s *Service) listenForCycleTransitions(client pb.BeaconServiceClient) {
// Currently fetches assignments for all validators.
req := &pb.ValidatorAssignmentRequest{
AllValidators: true,
}
// listenForAssignmentChange listens for validator assignment changes via a RPC stream.
// when there's an assignment change, beacon service will update its shard ID, slot number and role.
func (s *Service) listenForAssignmentChange(client pb.BeaconServiceClient) {
req := &pb.ValidatorAssignmentRequest{PublicKeys: []*pb.PublicKey{{PublicKey: s.pubKey}}}
stream, err := client.ValidatorAssignments(s.ctx, req)
if err != nil {
log.Errorf("Could not setup validator assignments streaming client: %v", err)
log.Errorf("could not fetch validator assigned slot and responsibility from beacon node: %v", err)
return
}
for {
res, err := stream.Recv()
assignment, err := stream.Recv()
// If the stream is closed, we stop the loop.
if err == io.EOF {
break
@@ -211,28 +169,55 @@ func (s *Service) listenForCycleTransitions(client pb.BeaconServiceClient) {
}
if err != nil {
log.Errorf("Could not receive validator assignments from stream: %v", err)
log.Errorf("Could not receive latest validator assignment from stream: %v", err)
continue
}
log.WithField("slotNumber", s.CurrentBeaconSlot()).Info("New cycle transition")
for _, assign := range assignment.Assignments {
if bytes.Equal(assign.PublicKey.PublicKey, s.pubKey) {
s.role = assign.Role
s.assignedSlot = assign.AssignedSlot
s.shardID = assign.ShardId
// Loops through the received assignments to determine which one
// corresponds to this validator client based on a matching public key.
for _, assignment := range res.GetAssignments() {
// TODO(#566): Determine assignment based on public key flag.
pubKeyProto := assignment.GetPublicKey()
if isZeroAddress(pubKeyProto.GetPublicKey()) {
s.assignedSlot = s.CurrentCycleStartSlot() + assignment.GetAssignedSlot()
s.shardID = assignment.GetShardId()
s.role = assignment.GetRole()
break
log.Infof("Validator with pub key 0x%s re-assigned to shard ID %d for %v duty at slot %d",
string(s.pubKey),
s.assignedSlot,
s.role,
s.assignedSlot)
}
}
if s.role == pb.ValidatorRole_PROPOSER {
log.Infof("Assigned as PROPOSER to slot %v, shardID: %v", s.assignedSlot, s.shardID)
} else {
log.Infof("Assigned as ATTESTER to slot %v, shardID: %v", s.assignedSlot, s.shardID)
}
}
// waitForAssignment waits till it's validator's role to attest or propose. Then it forwards
// the canonical block to the proposer service or attester service to process.
func (s *Service) waitForAssignment(ticker <-chan time.Time, client pb.BeaconServiceClient) {
for {
select {
case <-s.ctx.Done():
return
case <-ticker:
if s.role == pb.ValidatorRole_ATTESTER && s.assignedSlot == s.CurrentBeaconSlot() {
log.WithField("slotNumber", s.CurrentBeaconSlot()).Info("Assigned attest slot number reached")
block, err := client.CanonicalHead(s.ctx, &empty.Empty{})
if err != nil {
log.Errorf("Could not fetch canonical head via gRPC from beacon node: %v", err)
continue
}
// We forward the latest canonical block to the attester service a feed.
s.attesterAssignmentFeed.Send(block)
} else if s.role == pb.ValidatorRole_PROPOSER && s.assignedSlot == s.CurrentBeaconSlot() {
log.WithField("slotNumber", s.CurrentBeaconSlot()).Info("Assigned proposal slot number reached")
block, err := client.CanonicalHead(s.ctx, &empty.Empty{})
if err != nil {
log.Errorf("Could not fetch canonical head via gRPC from beacon node: %v", err)
continue
}
// We forward the latest canonical block to the proposer service a feed.
s.proposerAssignmentFeed.Send(block)
}
}
}
}
@@ -261,6 +246,7 @@ func (s *Service) listenForProcessedAttestations(client pb.BeaconServiceClient)
log.Errorf("Could not receive latest attestation from stream: %v", err)
continue
}
log.WithField("slotNumber", attestation.GetSlot()).Info("Latest attestation slot number")
s.processedAttestationFeed.Send(attestation)
}
@@ -284,6 +270,11 @@ func (s *Service) ProcessedAttestationFeed() *event.Feed {
return s.processedAttestationFeed
}
// PublicKey returns validator's public key.
func (s *Service) PublicKey() []byte {
return s.pubKey
}
// CurrentBeaconSlot based on the genesis timestamp of the protocol.
func (s *Service) CurrentBeaconSlot() uint64 {
secondsSinceGenesis := time.Since(s.genesisTimestamp).Seconds()
@@ -296,8 +287,3 @@ func (s *Service) CurrentCycleStartSlot() uint64 {
cycleNum := math.Floor(float64(currentSlot) / float64(params.DefaultConfig().CycleLength))
return uint64(cycleNum) * params.DefaultConfig().CycleLength
}
// isZeroAddress compares a withdrawal address to an empty byte array.
func isZeroAddress(withdrawalAddress []byte) bool {
return bytes.Equal(withdrawalAddress, []byte{})
}

View File

@@ -1,6 +1,7 @@
package beacon
import (
"bytes"
"context"
"errors"
"io"
@@ -8,7 +9,7 @@ import (
"testing"
"time"
gomock "github.com/golang/mock/gomock"
"github.com/golang/mock/gomock"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
@@ -77,7 +78,7 @@ func TestLifecycle(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
b := NewBeaconValidator(context.Background(), &mockLifecycleClient{ctrl})
b := NewBeaconValidator(context.Background(), []byte{}, &mockLifecycleClient{ctrl})
// Testing basic feeds.
if b.AttesterAssignmentFeed() == nil {
t.Error("AttesterAssignmentFeed empty")
@@ -88,6 +89,9 @@ func TestLifecycle(t *testing.T) {
if b.ProcessedAttestationFeed() == nil {
t.Error("ProcessedAttestationFeed empty")
}
if !bytes.Equal(b.PublicKey(), []byte{}) {
t.Error("Incorrect public key")
}
b.slotAlignmentDuration = time.Millisecond * 10
b.Start()
time.Sleep(time.Millisecond * 10)
@@ -99,7 +103,7 @@ func TestLifecycle(t *testing.T) {
func TestCurrentBeaconSlot(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
b := NewBeaconValidator(context.Background(), &mockLifecycleClient{ctrl})
b := NewBeaconValidator(context.Background(), []byte{}, &mockLifecycleClient{ctrl})
b.genesisTimestamp = time.Now()
if b.CurrentBeaconSlot() != 0 {
t.Errorf("Expected us to be in the 0th slot, received %v", b.CurrentBeaconSlot())
@@ -110,7 +114,7 @@ func TestWaitForAssignmentProposer(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
b := NewBeaconValidator(context.Background(), &mockClient{ctrl})
b := NewBeaconValidator(context.Background(), []byte{}, &mockClient{ctrl})
mockServiceClient := internal.NewMockBeaconServiceClient(ctrl)
mockServiceClient.EXPECT().CanonicalHead(
@@ -132,14 +136,14 @@ func TestWaitForAssignmentProposer(t *testing.T) {
b.cancel()
exitRoutine <- true
testutil.AssertLogsContain(t, hook, "New beacon node slot")
testutil.AssertLogsContain(t, hook, "Assigned proposal slot number reached")
}
func TestWaitForAssignmentProposerError(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
b := NewBeaconValidator(context.Background(), &mockClient{ctrl})
b := NewBeaconValidator(context.Background(), []byte{}, &mockClient{ctrl})
mockServiceClient := internal.NewMockBeaconServiceClient(ctrl)
mockServiceClient.EXPECT().CanonicalHead(
@@ -168,7 +172,7 @@ func TestWaitForAssignmentAttester(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
b := NewBeaconValidator(context.Background(), &mockClient{ctrl})
b := NewBeaconValidator(context.Background(), []byte{}, &mockClient{ctrl})
mockServiceClient := internal.NewMockBeaconServiceClient(ctrl)
mockServiceClient.EXPECT().CanonicalHead(
@@ -190,14 +194,14 @@ func TestWaitForAssignmentAttester(t *testing.T) {
b.cancel()
exitRoutine <- true
testutil.AssertLogsContain(t, hook, "New beacon node slot")
testutil.AssertLogsContain(t, hook, "Assigned attest slot number reached")
}
func TestWaitForAssignmentAttesterError(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
b := NewBeaconValidator(context.Background(), &mockClient{ctrl})
b := NewBeaconValidator(context.Background(), []byte{}, &mockClient{ctrl})
mockServiceClient := internal.NewMockBeaconServiceClient(ctrl)
mockServiceClient.EXPECT().CanonicalHead(
@@ -226,7 +230,7 @@ func TestListenForProcessedAttestations(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
b := NewBeaconValidator(context.Background(), &mockClient{ctrl})
b := NewBeaconValidator(context.Background(), []byte{}, &mockClient{ctrl})
// Create mock for the stream returned by LatestAttestation.
stream := internal.NewMockBeaconService_LatestAttestationClient(ctrl)

View File

@@ -1,6 +1,7 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1 (interfaces: BeaconServiceClient,BeaconService_LatestAttestationClient,BeaconService_ValidatorAssignmentsClient)
// Package mock_v1 is a generated GoMock package.
package internal
import (
@@ -62,6 +63,7 @@ func (m *MockBeaconServiceClient) CurrentAssignmentsAndGenesisTime(arg0 context.
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "CurrentAssignmentsAndGenesisTime", varargs...)
ret0, _ := ret[0].(*v10.CurrentAssignmentsResponse)
ret1, _ := ret[1].(error)

View File

@@ -57,6 +57,7 @@ func NewShardInstance(ctx *cli.Context) (*ShardEthereum, error) {
services: registry,
stop: make(chan struct{}),
}
var pubKey []byte
inputKey := ctx.GlobalString(types.PubKeyFlag.Name)
@@ -88,15 +89,15 @@ func NewShardInstance(ctx *cli.Context) (*ShardEthereum, error) {
return nil, err
}
if err := shardEthereum.registerBeaconService(); err != nil {
if err := shardEthereum.registerBeaconService(pubKey); err != nil {
return nil, err
}
if err := shardEthereum.registerAttesterService(pubKey); err != nil {
if err := shardEthereum.registerAttesterService(); err != nil {
return nil, err
}
if err := shardEthereum.registerProposerService(pubKey); err != nil {
if err := shardEthereum.registerProposerService(); err != nil {
return nil, err
}
@@ -188,17 +189,18 @@ func (s *ShardEthereum) registerTXPool() error {
// registerBeaconService registers a service that fetches streams from a beacon node
// via RPC.
func (s *ShardEthereum) registerBeaconService() error {
func (s *ShardEthereum) registerBeaconService(pubKey []byte) error {
var rpcService *rpcclient.Service
if err := s.services.FetchService(&rpcService); err != nil {
return err
}
b := beacon.NewBeaconValidator(context.TODO(), rpcService)
b := beacon.NewBeaconValidator(context.TODO(), pubKey, rpcService)
return s.services.RegisterService(b)
}
// registerAttesterService that listens to assignments from the beacon service.
func (s *ShardEthereum) registerAttesterService(pubkey []byte) error {
func (s *ShardEthereum) registerAttesterService() error {
var beaconService *beacon.Service
if err := s.services.FetchService(&beaconService); err != nil {
return err
@@ -213,13 +215,12 @@ func (s *ShardEthereum) registerAttesterService(pubkey []byte) error {
Assigner: beaconService,
AssignmentBuf: 100,
Client: rpcService,
Pubkey: pubkey,
})
return s.services.RegisterService(att)
}
// registerProposerService that listens to assignments from the beacon service.
func (s *ShardEthereum) registerProposerService(pubKey []byte) error {
func (s *ShardEthereum) registerProposerService() error {
var rpcService *rpcclient.Service
if err := s.services.FetchService(&rpcService); err != nil {
return err
@@ -235,7 +236,6 @@ func (s *ShardEthereum) registerProposerService(pubKey []byte) error {
AssignmentBuf: 100,
AttestationBufferSize: 100,
AttesterFeed: beaconService,
Pubkey: pubKey,
})
return s.services.RegisterService(prop)
}

View File

@@ -22,7 +22,7 @@ type rpcClientService interface {
ProposerServiceClient() pb.ProposerServiceClient
}
type assignmentAnnouncer interface {
type beaconClientService interface {
ProposerAssignmentFeed() *event.Feed
}
@@ -36,24 +36,22 @@ type rpcAttestationService interface {
type Proposer struct {
ctx context.Context
cancel context.CancelFunc
assigner assignmentAnnouncer
beaconService beaconClientService
rpcClientService rpcClientService
assignmentChan chan *pbp2p.BeaconBlock
attestationService rpcAttestationService
attestationChan chan *pbp2p.AggregatedAttestation
pendingAttestation []*pbp2p.AggregatedAttestation
lock sync.Mutex
pubkey []byte
}
// Config options for proposer service.
type Config struct {
AssignmentBuf int
AttestationBufferSize int
Assigner assignmentAnnouncer
Assigner beaconClientService
AttesterFeed rpcAttestationService
Client rpcClientService
Pubkey []byte
}
// NewProposer creates a new attester instance.
@@ -62,10 +60,9 @@ func NewProposer(ctx context.Context, cfg *Config) *Proposer {
return &Proposer{
ctx: ctx,
cancel: cancel,
assigner: cfg.Assigner,
beaconService: cfg.Assigner,
rpcClientService: cfg.Client,
attestationService: cfg.AttesterFeed,
pubkey: cfg.Pubkey,
assignmentChan: make(chan *pbp2p.BeaconBlock, cfg.AssignmentBuf),
attestationChan: make(chan *pbp2p.AggregatedAttestation, cfg.AttestationBufferSize),
pendingAttestation: make([]*pbp2p.AggregatedAttestation, 0),
@@ -145,7 +142,7 @@ func (p *Proposer) processAttestation(done <-chan struct{}) {
// run the main event loop that listens for a proposer assignment.
func (p *Proposer) run(done <-chan struct{}, client pb.ProposerServiceClient) {
sub := p.assigner.ProposerAssignmentFeed().Subscribe(p.assignmentChan)
sub := p.beaconService.ProposerAssignmentFeed().Subscribe(p.assignmentChan)
defer sub.Unsubscribe()
for {