mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-05-02 03:02:54 -04:00
**What type of PR is this?** Other **What does this PR do? Why is it needed?** Follow up to https://github.com/OffchainLabs/prysm/pull/16215 this pr improves logging, fixes stuttering in package naming, adds additional unit tests, and deduplicates fallback node code. **Which issues(s) does this PR fix?** fixes a potential race if reconnecting to the same host very quickly which has a stale connection still. **Other notes for review** **Acknowledgements** - [x] I have read [CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md). - [x] I have included a uniquely named [changelog fragment file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd). - [x] I have added a description with sufficient context for reviewers to understand this PR. - [x] I have tested that my changes work as expected and I added a testing plan to the PR description (if applicable).
349 lines
12 KiB
Go
349 lines
12 KiB
Go
package grpc_api
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"testing"
|
|
"time"
|
|
|
|
eventClient "github.com/OffchainLabs/prysm/v7/api/client/event"
|
|
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
|
|
"github.com/OffchainLabs/prysm/v7/api/server/structs"
|
|
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
|
eth "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
|
"github.com/OffchainLabs/prysm/v7/testing/assert"
|
|
mock2 "github.com/OffchainLabs/prysm/v7/testing/mock"
|
|
"github.com/OffchainLabs/prysm/v7/testing/require"
|
|
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
|
|
validatorTesting "github.com/OffchainLabs/prysm/v7/validator/testing"
|
|
logTest "github.com/sirupsen/logrus/hooks/test"
|
|
"go.uber.org/mock/gomock"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/protobuf/types/known/emptypb"
|
|
)
|
|
|
|
func TestToValidatorDutiesContainer_HappyPath(t *testing.T) {
|
|
// Create a mock DutiesResponse with current and next duties.
|
|
dutiesResp := ð.DutiesResponse{
|
|
CurrentEpochDuties: []*eth.DutiesResponse_Duty{
|
|
{
|
|
Committee: []primitives.ValidatorIndex{100, 101},
|
|
CommitteeIndex: 4,
|
|
AttesterSlot: 200,
|
|
ProposerSlots: []primitives.Slot{400},
|
|
PublicKey: []byte{0xAA, 0xBB},
|
|
Status: eth.ValidatorStatus_ACTIVE,
|
|
ValidatorIndex: 101,
|
|
IsSyncCommittee: false,
|
|
CommitteesAtSlot: 2,
|
|
},
|
|
},
|
|
NextEpochDuties: []*eth.DutiesResponse_Duty{
|
|
{
|
|
Committee: []primitives.ValidatorIndex{300, 301},
|
|
CommitteeIndex: 8,
|
|
AttesterSlot: 600,
|
|
ProposerSlots: []primitives.Slot{700, 701},
|
|
PublicKey: []byte{0xCC, 0xDD},
|
|
Status: eth.ValidatorStatus_ACTIVE,
|
|
ValidatorIndex: 301,
|
|
IsSyncCommittee: true,
|
|
CommitteesAtSlot: 3,
|
|
},
|
|
},
|
|
}
|
|
|
|
gotContainer, err := toValidatorDutiesContainer(dutiesResp)
|
|
require.NoError(t, err)
|
|
|
|
// Validate we have the correct number of duties in current and next epochs.
|
|
assert.Equal(t, len(gotContainer.CurrentEpochDuties), len(dutiesResp.CurrentEpochDuties))
|
|
assert.Equal(t, len(gotContainer.NextEpochDuties), len(dutiesResp.NextEpochDuties))
|
|
|
|
firstCurrentDuty := gotContainer.CurrentEpochDuties[0]
|
|
expectedCurrentDuty := dutiesResp.CurrentEpochDuties[0]
|
|
assert.DeepEqual(t, firstCurrentDuty.PublicKey, expectedCurrentDuty.PublicKey)
|
|
assert.Equal(t, firstCurrentDuty.ValidatorIndex, expectedCurrentDuty.ValidatorIndex)
|
|
assert.DeepEqual(t, firstCurrentDuty.ProposerSlots, expectedCurrentDuty.ProposerSlots)
|
|
|
|
firstNextDuty := gotContainer.NextEpochDuties[0]
|
|
expectedNextDuty := dutiesResp.NextEpochDuties[0]
|
|
assert.DeepEqual(t, firstNextDuty.PublicKey, expectedNextDuty.PublicKey)
|
|
assert.Equal(t, firstNextDuty.ValidatorIndex, expectedNextDuty.ValidatorIndex)
|
|
assert.DeepEqual(t, firstNextDuty.ProposerSlots, expectedNextDuty.ProposerSlots)
|
|
}
|
|
|
|
func TestToValidatorDutiesContainerV2_HappyPath(t *testing.T) {
|
|
// Create a mock DutiesResponse with current and next duties.
|
|
dutiesResp := ð.DutiesV2Response{
|
|
CurrentEpochDuties: []*eth.DutiesV2Response_Duty{
|
|
{
|
|
CommitteeLength: 2,
|
|
CommitteeIndex: 4,
|
|
ValidatorCommitteeIndex: 1,
|
|
AttesterSlot: 200,
|
|
ProposerSlots: []primitives.Slot{400},
|
|
PublicKey: []byte{0xAA, 0xBB},
|
|
Status: eth.ValidatorStatus_ACTIVE,
|
|
ValidatorIndex: 101,
|
|
IsSyncCommittee: false,
|
|
CommitteesAtSlot: 2,
|
|
},
|
|
},
|
|
NextEpochDuties: []*eth.DutiesV2Response_Duty{
|
|
{
|
|
CommitteeLength: 2,
|
|
CommitteeIndex: 8,
|
|
ValidatorCommitteeIndex: 1,
|
|
AttesterSlot: 600,
|
|
ProposerSlots: []primitives.Slot{700, 701},
|
|
PublicKey: []byte{0xCC, 0xDD},
|
|
Status: eth.ValidatorStatus_ACTIVE,
|
|
ValidatorIndex: 301,
|
|
IsSyncCommittee: true,
|
|
CommitteesAtSlot: 3,
|
|
},
|
|
},
|
|
}
|
|
|
|
gotContainer, err := toValidatorDutiesContainerV2(dutiesResp)
|
|
require.NoError(t, err)
|
|
|
|
// Validate we have the correct number of duties in current and next epochs.
|
|
assert.Equal(t, len(gotContainer.CurrentEpochDuties), len(dutiesResp.CurrentEpochDuties))
|
|
assert.Equal(t, len(gotContainer.NextEpochDuties), len(dutiesResp.NextEpochDuties))
|
|
|
|
firstCurrentDuty := gotContainer.CurrentEpochDuties[0]
|
|
expectedCurrentDuty := dutiesResp.CurrentEpochDuties[0]
|
|
assert.DeepEqual(t, firstCurrentDuty.PublicKey, expectedCurrentDuty.PublicKey)
|
|
assert.Equal(t, firstCurrentDuty.ValidatorIndex, expectedCurrentDuty.ValidatorIndex)
|
|
assert.DeepEqual(t, firstCurrentDuty.ProposerSlots, expectedCurrentDuty.ProposerSlots)
|
|
|
|
firstNextDuty := gotContainer.NextEpochDuties[0]
|
|
expectedNextDuty := dutiesResp.NextEpochDuties[0]
|
|
assert.DeepEqual(t, firstNextDuty.PublicKey, expectedNextDuty.PublicKey)
|
|
assert.Equal(t, firstNextDuty.ValidatorIndex, expectedNextDuty.ValidatorIndex)
|
|
assert.DeepEqual(t, firstNextDuty.ProposerSlots, expectedNextDuty.ProposerSlots)
|
|
}
|
|
|
|
func TestWaitForChainStart_StreamSetupFails(t *testing.T) {
|
|
ctrl := gomock.NewController(t)
|
|
defer ctrl.Finish()
|
|
|
|
beaconNodeValidatorClient := mock2.NewMockBeaconNodeValidatorClient(ctrl)
|
|
beaconNodeValidatorClient.EXPECT().WaitForChainStart(
|
|
gomock.Any(),
|
|
gomock.Any(),
|
|
).Return(nil, errors.New("failed stream"))
|
|
|
|
validatorClient := &grpcValidatorClient{
|
|
grpcClientManager: newGrpcClientManager(
|
|
validatorTesting.MockNodeConnection(),
|
|
func(_ grpc.ClientConnInterface) eth.BeaconNodeValidatorClient {
|
|
return beaconNodeValidatorClient
|
|
},
|
|
),
|
|
isEventStreamRunning: true,
|
|
}
|
|
_, err := validatorClient.WaitForChainStart(t.Context(), &emptypb.Empty{})
|
|
want := "could not setup beacon chain ChainStart streaming client"
|
|
assert.ErrorContains(t, want, err)
|
|
}
|
|
|
|
func TestStartEventStream(t *testing.T) {
|
|
hook := logTest.NewGlobal()
|
|
ctx, cancel := context.WithCancel(t.Context())
|
|
defer cancel()
|
|
ctrl := gomock.NewController(t)
|
|
defer ctrl.Finish()
|
|
beaconNodeValidatorClient := mock2.NewMockBeaconNodeValidatorClient(ctrl)
|
|
grpcClient := &grpcValidatorClient{
|
|
grpcClientManager: newGrpcClientManager(
|
|
validatorTesting.MockNodeConnection(),
|
|
func(_ grpc.ClientConnInterface) eth.BeaconNodeValidatorClient {
|
|
return beaconNodeValidatorClient
|
|
},
|
|
),
|
|
isEventStreamRunning: true,
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
topics []string
|
|
prepare func()
|
|
verify func(t *testing.T, event *eventClient.Event)
|
|
}{
|
|
{
|
|
name: "Happy path Head topic",
|
|
topics: []string{"head"},
|
|
prepare: func() {
|
|
stream := mock2.NewMockBeaconNodeValidator_StreamSlotsClient(ctrl)
|
|
beaconNodeValidatorClient.EXPECT().StreamSlots(gomock.Any(),
|
|
ð.StreamSlotsRequest{VerifiedOnly: true}).Return(stream, nil)
|
|
stream.EXPECT().Context().Return(ctx).AnyTimes()
|
|
stream.EXPECT().Recv().Return(
|
|
ð.StreamSlotsResponse{Slot: 123},
|
|
nil,
|
|
).AnyTimes()
|
|
},
|
|
verify: func(t *testing.T, event *eventClient.Event) {
|
|
require.Equal(t, event.EventType, eventClient.EventHead)
|
|
head := structs.HeadEvent{}
|
|
require.NoError(t, json.Unmarshal(event.Data, &head))
|
|
require.Equal(t, head.Slot, "123")
|
|
},
|
|
},
|
|
{
|
|
name: "no head produces error",
|
|
topics: []string{"unsupportedTopic"},
|
|
prepare: func() {
|
|
stream := mock2.NewMockBeaconNodeValidator_StreamSlotsClient(ctrl)
|
|
beaconNodeValidatorClient.EXPECT().StreamSlots(gomock.Any(),
|
|
ð.StreamSlotsRequest{VerifiedOnly: true}).Return(stream, nil)
|
|
stream.EXPECT().Context().Return(ctx).AnyTimes()
|
|
stream.EXPECT().Recv().Return(
|
|
ð.StreamSlotsResponse{Slot: 123},
|
|
nil,
|
|
).AnyTimes()
|
|
},
|
|
verify: func(t *testing.T, event *eventClient.Event) {
|
|
require.Equal(t, event.EventType, eventClient.EventConnectionError)
|
|
},
|
|
},
|
|
{
|
|
name: "Unsupported topics warning",
|
|
topics: []string{"head", "unsupportedTopic"},
|
|
prepare: func() {
|
|
stream := mock2.NewMockBeaconNodeValidator_StreamSlotsClient(ctrl)
|
|
beaconNodeValidatorClient.EXPECT().StreamSlots(gomock.Any(),
|
|
ð.StreamSlotsRequest{VerifiedOnly: true}).Return(stream, nil)
|
|
stream.EXPECT().Context().Return(ctx).AnyTimes()
|
|
stream.EXPECT().Recv().Return(
|
|
ð.StreamSlotsResponse{Slot: 123},
|
|
nil,
|
|
).AnyTimes()
|
|
},
|
|
verify: func(t *testing.T, event *eventClient.Event) {
|
|
require.Equal(t, event.EventType, eventClient.EventHead)
|
|
head := structs.HeadEvent{}
|
|
require.NoError(t, json.Unmarshal(event.Data, &head))
|
|
require.Equal(t, head.Slot, "123")
|
|
assert.LogsContain(t, hook, "gRPC only supports the head topic")
|
|
},
|
|
},
|
|
{
|
|
name: "No topics error",
|
|
topics: []string{},
|
|
prepare: func() {},
|
|
verify: func(t *testing.T, event *eventClient.Event) {
|
|
require.Equal(t, event.EventType, eventClient.EventError)
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tc := range tests {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
eventsChannel := make(chan *eventClient.Event, 1) // Buffer to prevent blocking
|
|
tc.prepare() // Setup mock expectations
|
|
|
|
go grpcClient.StartEventStream(ctx, tc.topics, eventsChannel)
|
|
|
|
event := <-eventsChannel
|
|
// Depending on what you're testing, you may need a timeout or a specific number of events to read
|
|
time.AfterFunc(1*time.Second, cancel) // Prevents hanging forever
|
|
tc.verify(t, event)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestEnsureReady(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
hosts []string
|
|
healthResults []error // one per GetHealth call in order
|
|
expectedResult bool
|
|
expectedIndex int // expected provider index after EnsureReady
|
|
}{
|
|
{
|
|
name: "Single host ready",
|
|
hosts: []string{"host1:4000"},
|
|
healthResults: []error{nil},
|
|
expectedResult: true,
|
|
expectedIndex: 0,
|
|
},
|
|
{
|
|
name: "Single host not ready",
|
|
hosts: []string{"host1:4000"},
|
|
healthResults: []error{errors.New("not synced")},
|
|
expectedResult: false,
|
|
expectedIndex: 0,
|
|
},
|
|
{
|
|
name: "Multiple hosts first ready",
|
|
hosts: []string{"host1:4000", "host2:4000", "host3:4000"},
|
|
healthResults: []error{nil},
|
|
expectedResult: true,
|
|
expectedIndex: 0,
|
|
},
|
|
{
|
|
name: "Failover to second host",
|
|
hosts: []string{"host1:4000", "host2:4000", "host3:4000"},
|
|
healthResults: []error{errors.New("not synced"), nil},
|
|
expectedResult: true,
|
|
expectedIndex: 1,
|
|
},
|
|
{
|
|
name: "Failover to third host",
|
|
hosts: []string{"host1:4000", "host2:4000", "host3:4000"},
|
|
healthResults: []error{errors.New("not synced"), errors.New("not synced"), nil},
|
|
expectedResult: true,
|
|
expectedIndex: 2,
|
|
},
|
|
{
|
|
name: "All hosts down",
|
|
hosts: []string{"host1:4000", "host2:4000", "host3:4000"},
|
|
healthResults: []error{errors.New("not synced"), errors.New("not synced"), errors.New("not synced")},
|
|
expectedResult: false,
|
|
expectedIndex: 2,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
ctrl := gomock.NewController(t)
|
|
|
|
mockProvider := &grpcutil.MockGrpcProvider{
|
|
MockHosts: tt.hosts,
|
|
}
|
|
conn, err := validatorHelpers.NewNodeConnection(
|
|
validatorHelpers.WithGRPCProvider(mockProvider),
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
mockNodeClient := mock2.NewMockNodeClient(ctrl)
|
|
for _, healthErr := range tt.healthResults {
|
|
if healthErr != nil {
|
|
mockNodeClient.EXPECT().GetHealth(gomock.Any(), gomock.Any()).Return(nil, healthErr)
|
|
} else {
|
|
mockNodeClient.EXPECT().GetHealth(gomock.Any(), gomock.Any()).Return(&emptypb.Empty{}, nil)
|
|
}
|
|
}
|
|
|
|
client := &grpcValidatorClient{
|
|
grpcClientManager: newGrpcClientManager(conn, func(_ grpc.ClientConnInterface) eth.BeaconNodeValidatorClient {
|
|
return mock2.NewMockBeaconNodeValidatorClient(ctrl)
|
|
}),
|
|
nodeClient: &grpcNodeClient{
|
|
grpcClientManager: newGrpcClientManager(conn, func(_ grpc.ClientConnInterface) eth.NodeClient {
|
|
return mockNodeClient
|
|
}),
|
|
},
|
|
}
|
|
|
|
result := client.EnsureReady(t.Context())
|
|
assert.Equal(t, tt.expectedResult, result)
|
|
assert.Equal(t, tt.expectedIndex, mockProvider.CurrentIndex)
|
|
})
|
|
}
|
|
}
|