mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 21:38:05 -05:00
Implement RPC Messaging Between Validators (#475)
* Adding Proto files * Move to shared package * adding attestation sub * gazelle * attestation check * proposal finished * Add in attesters responsibilities * fixing dependency issues * adding topics * refactoring tests * Adding more tests * adding more changes * gazelle * removing attester p2p * remove mock * changing to new proposer model * changing tests * making changes * gazelle * adding gomock * adding rpc methods, reverting changes to other proto files * gazelle and test changes * adding tests * adding mocks and tests * gazelle * fixing merge issues * lint * lint
This commit is contained in:
@@ -73,6 +73,7 @@ func (a *Attester) Stop() error {
|
||||
func (a *Attester) run(done <-chan struct{}, client pb.AttesterServiceClient) {
|
||||
sub := a.assigner.AttesterAssignmentFeed().Subscribe(a.assignmentChan)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
@@ -92,9 +93,9 @@ func (a *Attester) run(done <-chan struct{}, client pb.AttesterServiceClient) {
|
||||
Attestation: &pbp2p.AggregatedAttestation{
|
||||
Slot: latestBeaconBlock.GetSlotNumber(),
|
||||
ShardId: a.shardID,
|
||||
ShardBlockHash: latestBlockHash[:],
|
||||
AttesterBitfield: []byte{}, // TODO: Need to find which index this attester represents.
|
||||
AggregateSig: []uint64{}, // TODO: Need Signature verification scheme/library
|
||||
ShardBlockHash: latestBlockHash[:], // Is a stub for actual shard blockhash.
|
||||
AttesterBitfield: []byte{}, // TODO: Need to find which index this attester represents.
|
||||
AggregateSig: []uint64{}, // TODO: Need Signature verification scheme/library
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -21,25 +21,27 @@ type rpcClientService interface {
|
||||
|
||||
// Service that interacts with a beacon node via RPC.
|
||||
type Service struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
rpcClient rpcClientService
|
||||
validatorIndex int
|
||||
assignedSlot uint64
|
||||
responsibility string
|
||||
attesterAssignmentFeed *event.Feed
|
||||
proposerAssignmentFeed *event.Feed
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
rpcClient rpcClientService
|
||||
validatorIndex int
|
||||
assignedSlot uint64
|
||||
responsibility string
|
||||
attesterAssignmentFeed *event.Feed
|
||||
proposerAssignmentFeed *event.Feed
|
||||
processedAttestationFeed *event.Feed
|
||||
}
|
||||
|
||||
// NewBeaconValidator instantiates a service that interacts with a beacon node.
|
||||
func NewBeaconValidator(ctx context.Context, rpcClient rpcClientService) *Service {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
return &Service{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
rpcClient: rpcClient,
|
||||
attesterAssignmentFeed: new(event.Feed),
|
||||
proposerAssignmentFeed: new(event.Feed),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
rpcClient: rpcClient,
|
||||
attesterAssignmentFeed: new(event.Feed),
|
||||
proposerAssignmentFeed: new(event.Feed),
|
||||
processedAttestationFeed: new(event.Feed),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,6 +51,7 @@ func (s *Service) Start() {
|
||||
client := s.rpcClient.BeaconServiceClient()
|
||||
go s.fetchBeaconBlocks(client)
|
||||
go s.fetchCrystallizedState(client)
|
||||
go s.fetchProcessedAttestations(client)
|
||||
}
|
||||
|
||||
// Stop the main loop..
|
||||
@@ -70,6 +73,12 @@ func (s *Service) ProposerAssignmentFeed() *event.Feed {
|
||||
return s.proposerAssignmentFeed
|
||||
}
|
||||
|
||||
// ProcessedAttestationFeed returns a feed that is wriiten to whenever a validator receives an
|
||||
// attestation from the beacon node.
|
||||
func (s *Service) ProcessedAttestationFeed() *event.Feed {
|
||||
return s.processedAttestationFeed
|
||||
}
|
||||
|
||||
func (s *Service) fetchBeaconBlocks(client pb.BeaconServiceClient) {
|
||||
stream, err := client.LatestBeaconBlock(s.ctx, &empty.Empty{})
|
||||
if err != nil {
|
||||
@@ -202,6 +211,29 @@ func (s *Service) fetchCrystallizedState(client pb.BeaconServiceClient) {
|
||||
}
|
||||
}
|
||||
|
||||
// fetchProcessedAttestations fetches processed attestations from the beacon node.
|
||||
func (s *Service) fetchProcessedAttestations(client pb.BeaconServiceClient) {
|
||||
stream, err := client.LatestAttestation(s.ctx, &empty.Empty{})
|
||||
if err != nil {
|
||||
log.Errorf("Could not setup beacon chain attestation streaming client: %v", err)
|
||||
return
|
||||
}
|
||||
for {
|
||||
attestation, err := stream.Recv()
|
||||
|
||||
// If the stream is closed, we stop the loop.
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
log.Errorf("Could not receive latest attestation from stream: %v", err)
|
||||
continue
|
||||
}
|
||||
log.WithField("slotNumber", attestation.GetSlot()).Info("Latest attestation slot number")
|
||||
s.processedAttestationFeed.Send(attestation)
|
||||
}
|
||||
}
|
||||
|
||||
// isZeroAddress compares a withdrawal address to an empty byte array.
|
||||
func isZeroAddress(withdrawalAddress []byte) bool {
|
||||
return bytes.Equal(withdrawalAddress, []byte{})
|
||||
|
||||
@@ -34,6 +34,8 @@ func (fc *mockClient) BeaconServiceClient() pb.BeaconServiceClient {
|
||||
blockStream.EXPECT().Recv().Return(&pbp2p.BeaconBlock{}, io.EOF)
|
||||
stateStream := internal.NewMockBeaconService_LatestCrystallizedStateClient(fc.ctrl)
|
||||
stateStream.EXPECT().Recv().Return(&pbp2p.CrystallizedState{}, io.EOF)
|
||||
attesterStream := internal.NewMockBeaconService_LatestAttestationClient(fc.ctrl)
|
||||
attesterStream.EXPECT().Recv().Return(&pbp2p.AggregatedAttestation{}, io.EOF)
|
||||
|
||||
mockServiceClient.EXPECT().LatestBeaconBlock(
|
||||
gomock.Any(),
|
||||
@@ -43,6 +45,10 @@ func (fc *mockClient) BeaconServiceClient() pb.BeaconServiceClient {
|
||||
gomock.Any(),
|
||||
&empty.Empty{},
|
||||
).Return(stateStream, nil)
|
||||
mockServiceClient.EXPECT().LatestAttestation(
|
||||
gomock.Any(),
|
||||
&empty.Empty{},
|
||||
).Return(attesterStream, nil)
|
||||
return mockServiceClient
|
||||
}
|
||||
|
||||
@@ -58,6 +64,9 @@ func TestLifecycle(t *testing.T) {
|
||||
if b.ProposerAssignmentFeed() == nil {
|
||||
t.Error("ProposerAssignmentFeed empty")
|
||||
}
|
||||
if b.ProcessedAttestationFeed() == nil {
|
||||
t.Error("ProcessedAttestationFeed empty")
|
||||
}
|
||||
b.Start()
|
||||
// TODO: find a better way to test this. The problem is that start is non-blocking, and it ends
|
||||
// before the for loops of its inner goroutines begin.
|
||||
@@ -274,3 +283,53 @@ func TestFetchCrystallizedState(t *testing.T) {
|
||||
|
||||
testutil.AssertLogsContain(t, hook, "Validator selected as proposer of the next slot")
|
||||
}
|
||||
|
||||
func TestFetchProcessedAttestations(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
b := NewBeaconValidator(context.Background(), &mockClient{ctrl})
|
||||
|
||||
// Create mock for the stream returned by LatestAttestation.
|
||||
stream := internal.NewMockBeaconService_LatestAttestationClient(ctrl)
|
||||
|
||||
// Testing if an attestation is received,triggering a log.
|
||||
stream.EXPECT().Recv().Return(&pbp2p.AggregatedAttestation{Slot: 10}, nil)
|
||||
stream.EXPECT().Recv().Return(&pbp2p.AggregatedAttestation{}, io.EOF)
|
||||
|
||||
mockServiceClient := internal.NewMockBeaconServiceClient(ctrl)
|
||||
mockServiceClient.EXPECT().LatestAttestation(
|
||||
gomock.Any(),
|
||||
gomock.Any(),
|
||||
).Return(stream, nil)
|
||||
|
||||
b.fetchProcessedAttestations(mockServiceClient)
|
||||
|
||||
testutil.AssertLogsContain(t, hook, "Latest attestation slot number")
|
||||
|
||||
// Testing an error coming from the stream.
|
||||
stream = internal.NewMockBeaconService_LatestAttestationClient(ctrl)
|
||||
stream.EXPECT().Recv().Return(&pbp2p.AggregatedAttestation{}, errors.New("stream error"))
|
||||
stream.EXPECT().Recv().Return(&pbp2p.AggregatedAttestation{}, io.EOF)
|
||||
|
||||
mockServiceClient = internal.NewMockBeaconServiceClient(ctrl)
|
||||
mockServiceClient.EXPECT().LatestAttestation(
|
||||
gomock.Any(),
|
||||
gomock.Any(),
|
||||
).Return(stream, nil)
|
||||
|
||||
b.fetchProcessedAttestations(mockServiceClient)
|
||||
|
||||
testutil.AssertLogsContain(t, hook, "stream error")
|
||||
|
||||
// Creating a faulty stream will trigger error.
|
||||
mockServiceClient = internal.NewMockBeaconServiceClient(ctrl)
|
||||
mockServiceClient.EXPECT().LatestAttestation(
|
||||
gomock.Any(),
|
||||
gomock.Any(),
|
||||
).Return(stream, errors.New("stream creation failed"))
|
||||
|
||||
b.fetchProcessedAttestations(mockServiceClient)
|
||||
testutil.AssertLogsContain(t, hook, "stream creation failed")
|
||||
testutil.AssertLogsContain(t, hook, "Could not receive latest attestation from stream")
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1 (interfaces: BeaconServiceClient,BeaconService_LatestBeaconBlockClient,BeaconService_LatestCrystallizedStateClient)
|
||||
// Source: github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1 (interfaces: BeaconServiceClient,BeaconService_LatestBeaconBlockClient,BeaconService_LatestCrystallizedStateClient,BeaconService_LatestAttestationClient)
|
||||
|
||||
package internal
|
||||
|
||||
@@ -56,6 +56,24 @@ func (mr *MockBeaconServiceClientMockRecorder) FetchShuffledValidatorIndices(arg
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchShuffledValidatorIndices", reflect.TypeOf((*MockBeaconServiceClient)(nil).FetchShuffledValidatorIndices), varargs...)
|
||||
}
|
||||
|
||||
// LatestAttestation mocks base method
|
||||
func (m *MockBeaconServiceClient) LatestAttestation(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (v10.BeaconService_LatestAttestationClient, error) {
|
||||
varargs := []interface{}{arg0, arg1}
|
||||
for _, a := range arg2 {
|
||||
varargs = append(varargs, a)
|
||||
}
|
||||
ret := m.ctrl.Call(m, "LatestAttestation", varargs...)
|
||||
ret0, _ := ret[0].(v10.BeaconService_LatestAttestationClient)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// LatestAttestation indicates an expected call of LatestAttestation
|
||||
func (mr *MockBeaconServiceClientMockRecorder) LatestAttestation(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
|
||||
varargs := append([]interface{}{arg0, arg1}, arg2...)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatestAttestation", reflect.TypeOf((*MockBeaconServiceClient)(nil).LatestAttestation), varargs...)
|
||||
}
|
||||
|
||||
// LatestBeaconBlock mocks base method
|
||||
func (m *MockBeaconServiceClient) LatestBeaconBlock(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (v10.BeaconService_LatestBeaconBlockClient, error) {
|
||||
varargs := []interface{}{arg0, arg1}
|
||||
@@ -309,3 +327,112 @@ func (m *MockBeaconService_LatestCrystallizedStateClient) Trailer() metadata.MD
|
||||
func (mr *MockBeaconService_LatestCrystallizedStateClientMockRecorder) Trailer() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockBeaconService_LatestCrystallizedStateClient)(nil).Trailer))
|
||||
}
|
||||
|
||||
// MockBeaconService_LatestAttestationClient is a mock of BeaconService_LatestAttestationClient interface
|
||||
type MockBeaconService_LatestAttestationClient struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockBeaconService_LatestAttestationClientMockRecorder
|
||||
}
|
||||
|
||||
// MockBeaconService_LatestAttestationClientMockRecorder is the mock recorder for MockBeaconService_LatestAttestationClient
|
||||
type MockBeaconService_LatestAttestationClientMockRecorder struct {
|
||||
mock *MockBeaconService_LatestAttestationClient
|
||||
}
|
||||
|
||||
// NewMockBeaconService_LatestAttestationClient creates a new mock instance
|
||||
func NewMockBeaconService_LatestAttestationClient(ctrl *gomock.Controller) *MockBeaconService_LatestAttestationClient {
|
||||
mock := &MockBeaconService_LatestAttestationClient{ctrl: ctrl}
|
||||
mock.recorder = &MockBeaconService_LatestAttestationClientMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use
|
||||
func (m *MockBeaconService_LatestAttestationClient) EXPECT() *MockBeaconService_LatestAttestationClientMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// CloseSend mocks base method
|
||||
func (m *MockBeaconService_LatestAttestationClient) CloseSend() error {
|
||||
ret := m.ctrl.Call(m, "CloseSend")
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// CloseSend indicates an expected call of CloseSend
|
||||
func (mr *MockBeaconService_LatestAttestationClientMockRecorder) CloseSend() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockBeaconService_LatestAttestationClient)(nil).CloseSend))
|
||||
}
|
||||
|
||||
// Context mocks base method
|
||||
func (m *MockBeaconService_LatestAttestationClient) Context() context.Context {
|
||||
ret := m.ctrl.Call(m, "Context")
|
||||
ret0, _ := ret[0].(context.Context)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Context indicates an expected call of Context
|
||||
func (mr *MockBeaconService_LatestAttestationClientMockRecorder) Context() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockBeaconService_LatestAttestationClient)(nil).Context))
|
||||
}
|
||||
|
||||
// Header mocks base method
|
||||
func (m *MockBeaconService_LatestAttestationClient) Header() (metadata.MD, error) {
|
||||
ret := m.ctrl.Call(m, "Header")
|
||||
ret0, _ := ret[0].(metadata.MD)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Header indicates an expected call of Header
|
||||
func (mr *MockBeaconService_LatestAttestationClientMockRecorder) Header() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockBeaconService_LatestAttestationClient)(nil).Header))
|
||||
}
|
||||
|
||||
// Recv mocks base method
|
||||
func (m *MockBeaconService_LatestAttestationClient) Recv() (*v1.AggregatedAttestation, error) {
|
||||
ret := m.ctrl.Call(m, "Recv")
|
||||
ret0, _ := ret[0].(*v1.AggregatedAttestation)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Recv indicates an expected call of Recv
|
||||
func (mr *MockBeaconService_LatestAttestationClientMockRecorder) Recv() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockBeaconService_LatestAttestationClient)(nil).Recv))
|
||||
}
|
||||
|
||||
// RecvMsg mocks base method
|
||||
func (m *MockBeaconService_LatestAttestationClient) RecvMsg(arg0 interface{}) error {
|
||||
ret := m.ctrl.Call(m, "RecvMsg", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// RecvMsg indicates an expected call of RecvMsg
|
||||
func (mr *MockBeaconService_LatestAttestationClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockBeaconService_LatestAttestationClient)(nil).RecvMsg), arg0)
|
||||
}
|
||||
|
||||
// SendMsg mocks base method
|
||||
func (m *MockBeaconService_LatestAttestationClient) SendMsg(arg0 interface{}) error {
|
||||
ret := m.ctrl.Call(m, "SendMsg", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// SendMsg indicates an expected call of SendMsg
|
||||
func (mr *MockBeaconService_LatestAttestationClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockBeaconService_LatestAttestationClient)(nil).SendMsg), arg0)
|
||||
}
|
||||
|
||||
// Trailer mocks base method
|
||||
func (m *MockBeaconService_LatestAttestationClient) Trailer() metadata.MD {
|
||||
ret := m.ctrl.Call(m, "Trailer")
|
||||
ret0, _ := ret[0].(metadata.MD)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Trailer indicates an expected call of Trailer
|
||||
func (mr *MockBeaconService_LatestAttestationClientMockRecorder) Trailer() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockBeaconService_LatestAttestationClient)(nil).Trailer))
|
||||
}
|
||||
|
||||
@@ -198,9 +198,11 @@ func (s *ShardEthereum) registerProposerService() error {
|
||||
}
|
||||
|
||||
prop := proposer.NewProposer(context.TODO(), &proposer.Config{
|
||||
Assigner: beaconService,
|
||||
Client: rpcService,
|
||||
AssignmentBuf: 100,
|
||||
Assigner: beaconService,
|
||||
Client: rpcService,
|
||||
AssignmentBuf: 100,
|
||||
AttestationBufferSize: 100,
|
||||
AttesterFeed: beaconService,
|
||||
})
|
||||
return s.services.RegisterService(prop)
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ go_library(
|
||||
"//proto/beacon/p2p/v1:go_default_library",
|
||||
"//proto/beacon/rpc/v1:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//event:go_default_library",
|
||||
"@com_github_gogo_protobuf//proto:go_default_library",
|
||||
"@com_github_golang_protobuf//proto:go_default_library",
|
||||
"@com_github_golang_protobuf//ptypes:go_default_library_gen",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@org_golang_x_crypto//blake2b:go_default_library",
|
||||
|
||||
@@ -3,10 +3,12 @@
|
||||
package proposer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
|
||||
@@ -24,33 +26,47 @@ type assignmentAnnouncer interface {
|
||||
ProposerAssignmentFeed() *event.Feed
|
||||
}
|
||||
|
||||
type rpcAttestationService interface {
|
||||
ProcessedAttestationFeed() *event.Feed
|
||||
}
|
||||
|
||||
// Proposer holds functionality required to run a block proposer
|
||||
// in Ethereum 2.0. Must satisfy the Service interface defined in
|
||||
// sharding/service.go.
|
||||
type Proposer struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
assigner assignmentAnnouncer
|
||||
rpcClientService rpcClientService
|
||||
assignmentChan chan *pbp2p.BeaconBlock
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
assigner assignmentAnnouncer
|
||||
rpcClientService rpcClientService
|
||||
assignmentChan chan *pbp2p.BeaconBlock
|
||||
attestationService rpcAttestationService
|
||||
attestationChan chan *pbp2p.AggregatedAttestation
|
||||
pendingAttestation []*pbp2p.AggregatedAttestation
|
||||
mutex *sync.Mutex
|
||||
}
|
||||
|
||||
// Config options for proposer service.
|
||||
type Config struct {
|
||||
AssignmentBuf int
|
||||
Assigner assignmentAnnouncer
|
||||
Client rpcClientService
|
||||
AssignmentBuf int
|
||||
AttestationBufferSize int
|
||||
Assigner assignmentAnnouncer
|
||||
AttesterFeed rpcAttestationService
|
||||
Client rpcClientService
|
||||
}
|
||||
|
||||
// NewProposer creates a new attester instance.
|
||||
func NewProposer(ctx context.Context, cfg *Config) *Proposer {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
return &Proposer{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
assigner: cfg.Assigner,
|
||||
rpcClientService: cfg.Client,
|
||||
assignmentChan: make(chan *pbp2p.BeaconBlock, cfg.AssignmentBuf),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
assigner: cfg.Assigner,
|
||||
rpcClientService: cfg.Client,
|
||||
attestationService: cfg.AttesterFeed,
|
||||
assignmentChan: make(chan *pbp2p.BeaconBlock, cfg.AssignmentBuf),
|
||||
attestationChan: make(chan *pbp2p.AggregatedAttestation, cfg.AttestationBufferSize),
|
||||
pendingAttestation: make([]*pbp2p.AggregatedAttestation, 0),
|
||||
mutex: &sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,7 +74,10 @@ func NewProposer(ctx context.Context, cfg *Config) *Proposer {
|
||||
func (p *Proposer) Start() {
|
||||
log.Info("Starting service")
|
||||
client := p.rpcClientService.ProposerServiceClient()
|
||||
|
||||
go p.run(p.ctx.Done(), client)
|
||||
go p.processAttestation(p.ctx.Done())
|
||||
|
||||
}
|
||||
|
||||
// Stop the main loop.
|
||||
@@ -68,10 +87,65 @@ func (p *Proposer) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// DoesAttestationExist checks if an attester has already attested to a block.
|
||||
func (p *Proposer) DoesAttestationExist(attestation *pbp2p.AggregatedAttestation) bool {
|
||||
exists := false
|
||||
for _, record := range p.pendingAttestation {
|
||||
if bytes.Equal(record.GetAttesterBitfield(), attestation.GetAttesterBitfield()) {
|
||||
exists = true
|
||||
break
|
||||
}
|
||||
}
|
||||
return exists
|
||||
}
|
||||
|
||||
// AddPendingAttestation adds a pending attestation to the memory so that it can be included
|
||||
// in the next proposed block.
|
||||
func (p *Proposer) AddPendingAttestation(attestation *pbp2p.AggregatedAttestation) {
|
||||
p.pendingAttestation = append(p.pendingAttestation, attestation)
|
||||
}
|
||||
|
||||
// AggregateAllSignatures aggregates all the signatures of the attesters. This is currently a
|
||||
// stub for now till BLS/other signature schemes are implemented.
|
||||
func (p *Proposer) AggregateAllSignatures(attestations []*pbp2p.AggregatedAttestation) []uint32 {
|
||||
// TODO: Implement Signature Aggregation.
|
||||
return []uint32{}
|
||||
}
|
||||
|
||||
// GenerateBitmask creates the attestation bitmask from all the attester bitfields in the
|
||||
// attestation records.
|
||||
func (p *Proposer) GenerateBitmask(attestations []*pbp2p.AggregatedAttestation) []byte {
|
||||
// TODO: Implement bitmask where all attesters bitfields are aggregated.
|
||||
return []byte{}
|
||||
}
|
||||
|
||||
// processAttestation processes incoming broadcasted attestations from the beacon node.
|
||||
func (p *Proposer) processAttestation(done <-chan struct{}) {
|
||||
attestationSub := p.attestationService.ProcessedAttestationFeed().Subscribe(p.attestationChan)
|
||||
defer attestationSub.Unsubscribe()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
log.Debug("Proposer context closed, exiting goroutine")
|
||||
return
|
||||
case attestationRecord := <-p.attestationChan:
|
||||
|
||||
attestationExists := p.DoesAttestationExist(attestationRecord)
|
||||
if !attestationExists {
|
||||
p.AddPendingAttestation(attestationRecord)
|
||||
log.Info("Attestation stored in memory")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// run the main event loop that listens for a proposer assignment.
|
||||
func (p *Proposer) run(done <-chan struct{}, client pb.ProposerServiceClient) {
|
||||
sub := p.assigner.ProposerAssignmentFeed().Subscribe(p.assignmentChan)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
@@ -95,13 +169,19 @@ func (p *Proposer) run(done <-chan struct{}, client pb.ProposerServiceClient) {
|
||||
}
|
||||
latestBlockHash := blake2b.Sum512(data)
|
||||
|
||||
// To prevent any unaccounted attestations from being added.
|
||||
p.mutex.Lock()
|
||||
|
||||
agSig := p.AggregateAllSignatures(p.pendingAttestation)
|
||||
bitmask := p.GenerateBitmask(p.pendingAttestation)
|
||||
|
||||
// TODO: Implement real proposals with randao reveals and attestation fields.
|
||||
req := &pb.ProposeRequest{
|
||||
ParentHash: latestBlockHash[:],
|
||||
SlotNumber: latestBeaconBlock.GetSlotNumber() + 1,
|
||||
RandaoReveal: []byte{},
|
||||
AttestationBitmask: []byte{},
|
||||
AttestationAggregateSig: []uint32{},
|
||||
AttestationBitmask: bitmask,
|
||||
AttestationAggregateSig: agSig,
|
||||
Timestamp: ptypes.TimestampNow(),
|
||||
}
|
||||
|
||||
@@ -110,7 +190,10 @@ func (p *Proposer) run(done <-chan struct{}, client pb.ProposerServiceClient) {
|
||||
log.Errorf("Could not propose block: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Infof("Block proposed successfully with hash 0x%x", res.BlockHash)
|
||||
p.pendingAttestation = nil
|
||||
p.mutex.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package proposer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -36,6 +37,53 @@ func (m *mockAssigner) ProposerAssignmentFeed() *event.Feed {
|
||||
return new(event.Feed)
|
||||
}
|
||||
|
||||
type mockAttesterFeed struct{}
|
||||
|
||||
func (m *mockAttesterFeed) ProcessedAttestationFeed() *event.Feed {
|
||||
return new(event.Feed)
|
||||
}
|
||||
|
||||
func TestDoesAttestationExist(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
cfg := &Config{
|
||||
AssignmentBuf: 0,
|
||||
Assigner: &mockAssigner{},
|
||||
Client: &mockClient{ctrl},
|
||||
}
|
||||
p := NewProposer(context.Background(), cfg)
|
||||
|
||||
p.pendingAttestation = []*pbp2p.AggregatedAttestation{
|
||||
{
|
||||
AttesterBitfield: []byte{'a'},
|
||||
},
|
||||
{
|
||||
AttesterBitfield: []byte{'b'},
|
||||
},
|
||||
{
|
||||
AttesterBitfield: []byte{'c'},
|
||||
},
|
||||
{
|
||||
AttesterBitfield: []byte{'d'},
|
||||
}}
|
||||
|
||||
fakeAttestation := &pbp2p.AggregatedAttestation{
|
||||
AttesterBitfield: []byte{'e'},
|
||||
}
|
||||
|
||||
realAttestation := &pbp2p.AggregatedAttestation{
|
||||
AttesterBitfield: []byte{'a'},
|
||||
}
|
||||
|
||||
if p.DoesAttestationExist(fakeAttestation) {
|
||||
t.Fatal("invalid attestation exists")
|
||||
}
|
||||
|
||||
if !p.DoesAttestationExist(realAttestation) {
|
||||
t.Fatal("valid attestation does not exists")
|
||||
}
|
||||
|
||||
}
|
||||
func TestLifecycle(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ctrl := gomock.NewController(t)
|
||||
@@ -44,6 +92,7 @@ func TestLifecycle(t *testing.T) {
|
||||
AssignmentBuf: 0,
|
||||
Assigner: &mockAssigner{},
|
||||
Client: &mockClient{ctrl},
|
||||
AttesterFeed: &mockAttesterFeed{},
|
||||
}
|
||||
p := NewProposer(context.Background(), cfg)
|
||||
p.Start()
|
||||
@@ -53,7 +102,7 @@ func TestLifecycle(t *testing.T) {
|
||||
testutil.AssertLogsContain(t, hook, "Stopping service")
|
||||
}
|
||||
|
||||
func TestProposerLoop(t *testing.T) {
|
||||
func TestProposerReceiveBeaconBlock(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
@@ -61,12 +110,11 @@ func TestProposerLoop(t *testing.T) {
|
||||
AssignmentBuf: 0,
|
||||
Assigner: &mockAssigner{},
|
||||
Client: &mockClient{ctrl},
|
||||
AttesterFeed: &mockAttesterFeed{},
|
||||
}
|
||||
p := NewProposer(context.Background(), cfg)
|
||||
|
||||
mockServiceClient := internal.NewMockProposerServiceClient(ctrl)
|
||||
|
||||
// Expect first call to go through correctly.
|
||||
mockServiceClient.EXPECT().ProposeBlock(
|
||||
gomock.Any(),
|
||||
gomock.Any(),
|
||||
@@ -76,6 +124,7 @@ func TestProposerLoop(t *testing.T) {
|
||||
|
||||
doneChan := make(chan struct{})
|
||||
exitRoutine := make(chan bool)
|
||||
|
||||
go func() {
|
||||
p.run(doneChan, mockServiceClient)
|
||||
<-exitRoutine
|
||||
@@ -89,7 +138,7 @@ func TestProposerLoop(t *testing.T) {
|
||||
testutil.AssertLogsContain(t, hook, "Proposer context closed")
|
||||
}
|
||||
|
||||
func TestProposerMarshalError(t *testing.T) {
|
||||
func TestProposerProcessAttestation(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
@@ -97,27 +146,40 @@ func TestProposerMarshalError(t *testing.T) {
|
||||
AssignmentBuf: 0,
|
||||
Assigner: &mockAssigner{},
|
||||
Client: &mockClient{ctrl},
|
||||
AttesterFeed: &mockAttesterFeed{},
|
||||
}
|
||||
p := NewProposer(context.Background(), cfg)
|
||||
|
||||
mockServiceClient := internal.NewMockProposerServiceClient(ctrl)
|
||||
|
||||
doneChan := make(chan struct{})
|
||||
exitRoutine := make(chan bool)
|
||||
|
||||
go func() {
|
||||
p.run(doneChan, mockServiceClient)
|
||||
p.processAttestation(doneChan)
|
||||
<-exitRoutine
|
||||
}()
|
||||
p.pendingAttestation = []*pbp2p.AggregatedAttestation{
|
||||
{
|
||||
AttesterBitfield: []byte{'a'},
|
||||
},
|
||||
{
|
||||
AttesterBitfield: []byte{'b'},
|
||||
}}
|
||||
|
||||
attestation := &pbp2p.AggregatedAttestation{AttesterBitfield: []byte{'c'}}
|
||||
p.attestationChan <- attestation
|
||||
|
||||
p.assignmentChan <- nil
|
||||
doneChan <- struct{}{}
|
||||
exitRoutine <- true
|
||||
|
||||
testutil.AssertLogsContain(t, hook, "Could not marshal latest beacon block")
|
||||
testutil.AssertLogsContain(t, hook, "Attestation stored in memory")
|
||||
testutil.AssertLogsContain(t, hook, "Proposer context closed")
|
||||
|
||||
if !bytes.Equal(p.pendingAttestation[2].GetAttesterBitfield(), []byte{'c'}) {
|
||||
t.Errorf("attestation was unable to be saved %v", p.pendingAttestation[2].GetAttesterBitfield())
|
||||
}
|
||||
}
|
||||
|
||||
func TestProposerErrorLoop(t *testing.T) {
|
||||
func TestFullProposalOfBlock(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
@@ -125,6 +187,61 @@ func TestProposerErrorLoop(t *testing.T) {
|
||||
AssignmentBuf: 0,
|
||||
Assigner: &mockAssigner{},
|
||||
Client: &mockClient{ctrl},
|
||||
AttesterFeed: &mockAttesterFeed{},
|
||||
}
|
||||
p := NewProposer(context.Background(), cfg)
|
||||
mockServiceClient := internal.NewMockProposerServiceClient(ctrl)
|
||||
mockServiceClient.EXPECT().ProposeBlock(
|
||||
gomock.Any(),
|
||||
gomock.Any(),
|
||||
).Return(&pb.ProposeResponse{
|
||||
BlockHash: []byte("hi"),
|
||||
}, nil)
|
||||
|
||||
doneChan := make(chan struct{})
|
||||
exitRoutine := make(chan bool)
|
||||
|
||||
go p.run(doneChan, mockServiceClient)
|
||||
|
||||
go func() {
|
||||
p.processAttestation(doneChan)
|
||||
<-exitRoutine
|
||||
}()
|
||||
|
||||
p.pendingAttestation = []*pbp2p.AggregatedAttestation{
|
||||
{
|
||||
AttesterBitfield: []byte{'a'},
|
||||
},
|
||||
{
|
||||
AttesterBitfield: []byte{'b'},
|
||||
}}
|
||||
|
||||
attestation := &pbp2p.AggregatedAttestation{AttesterBitfield: []byte{'c'}}
|
||||
p.attestationChan <- attestation
|
||||
|
||||
p.assignmentChan <- &pbp2p.BeaconBlock{SlotNumber: 5}
|
||||
|
||||
doneChan <- struct{}{}
|
||||
doneChan <- struct{}{}
|
||||
exitRoutine <- true
|
||||
|
||||
testutil.AssertLogsContain(t, hook, "Performing proposer responsibility")
|
||||
testutil.AssertLogsContain(t, hook, fmt.Sprintf("Block proposed successfully with hash 0x%x", []byte("hi")))
|
||||
testutil.AssertLogsContain(t, hook, "Proposer context closed")
|
||||
testutil.AssertLogsContain(t, hook, "Attestation stored in memory")
|
||||
testutil.AssertLogsContain(t, hook, "Proposer context closed")
|
||||
|
||||
}
|
||||
|
||||
func TestProposerServiceErrors(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
cfg := &Config{
|
||||
AssignmentBuf: 0,
|
||||
Assigner: &mockAssigner{},
|
||||
Client: &mockClient{ctrl},
|
||||
AttesterFeed: &mockAttesterFeed{},
|
||||
}
|
||||
p := NewProposer(context.Background(), cfg)
|
||||
|
||||
@@ -138,16 +255,24 @@ func TestProposerErrorLoop(t *testing.T) {
|
||||
|
||||
doneChan := make(chan struct{})
|
||||
exitRoutine := make(chan bool)
|
||||
|
||||
go p.run(doneChan, mockServiceClient)
|
||||
|
||||
go func() {
|
||||
p.run(doneChan, mockServiceClient)
|
||||
p.processAttestation(doneChan)
|
||||
<-exitRoutine
|
||||
}()
|
||||
|
||||
p.assignmentChan <- &pbp2p.BeaconBlock{SlotNumber: 5}
|
||||
p.attestationChan <- &pbp2p.AggregatedAttestation{}
|
||||
p.assignmentChan <- nil
|
||||
p.assignmentChan <- &pbp2p.BeaconBlock{SlotNumber: 9}
|
||||
|
||||
doneChan <- struct{}{}
|
||||
doneChan <- struct{}{}
|
||||
exitRoutine <- true
|
||||
|
||||
testutil.AssertLogsContain(t, hook, "Performing proposer responsibility")
|
||||
testutil.AssertLogsContain(t, hook, "bad block proposed")
|
||||
testutil.AssertLogsContain(t, hook, "Could not marshal latest beacon block")
|
||||
testutil.AssertLogsContain(t, hook, "Proposer context closed")
|
||||
testutil.AssertLogsContain(t, hook, "Could not propose block: bad block proposed")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user