Hook to slot stream instead of block stream on the VC (#13327)

* Hook to slot stream instead of block stream on the VC

* Implement StreamSlots in the BN

* mock update

* fix tests

* don't return from stream

* Terence's review

* deepsource second complain

---------

Co-authored-by: rkapka <rkapka@wp.pl>
This commit is contained in:
Potuz
2023-12-13 20:13:56 -03:00
committed by GitHub
parent c47c52152b
commit 96df81d5c5
29 changed files with 2803 additions and 2139 deletions

View File

@@ -901,93 +901,31 @@ func TestCheckAndLogValidatorStatus_OK(t *testing.T) {
}
}
func TestService_ReceiveBlocks_NilBlock(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
valClient := validatormock.NewMockValidatorClient(ctrl)
v := validator{
blockFeed: new(event.Feed),
validatorClient: valClient,
}
stream := mock2.NewMockBeaconNodeValidatorAltair_StreamBlocksClient(ctrl)
ctx, cancel := context.WithCancel(context.Background())
valClient.EXPECT().StreamBlocksAltair(
gomock.Any(),
&ethpb.StreamBlocksRequest{VerifiedOnly: true},
).Return(stream, nil)
stream.EXPECT().Context().Return(ctx).AnyTimes()
stream.EXPECT().Recv().Return(
&ethpb.StreamBlocksResponse{Block: &ethpb.StreamBlocksResponse_Phase0Block{
Phase0Block: &ethpb.SignedBeaconBlock{}}},
nil,
).Do(func() {
cancel()
})
connectionErrorChannel := make(chan error)
v.ReceiveBlocks(ctx, connectionErrorChannel)
require.Equal(t, primitives.Slot(0), v.highestValidSlot)
}
func TestService_ReceiveBlocks_SetHighest(t *testing.T) {
func TestService_ReceiveSlots_SetHighest(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := validatormock.NewMockValidatorClient(ctrl)
v := validator{
validatorClient: client,
blockFeed: new(event.Feed),
slotFeed: new(event.Feed),
}
stream := mock2.NewMockBeaconNodeValidatorAltair_StreamBlocksClient(ctrl)
stream := mock2.NewMockBeaconNodeValidator_StreamSlotsClient(ctrl)
ctx, cancel := context.WithCancel(context.Background())
client.EXPECT().StreamBlocksAltair(
client.EXPECT().StreamSlots(
gomock.Any(),
&ethpb.StreamBlocksRequest{VerifiedOnly: true},
&ethpb.StreamSlotsRequest{VerifiedOnly: true},
).Return(stream, nil)
stream.EXPECT().Context().Return(ctx).AnyTimes()
slot := primitives.Slot(100)
stream.EXPECT().Recv().Return(
&ethpb.StreamBlocksResponse{
Block: &ethpb.StreamBlocksResponse_Phase0Block{
Phase0Block: &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: slot, Body: &ethpb.BeaconBlockBody{}}}},
},
&ethpb.StreamSlotsResponse{Slot: 123},
nil,
).Do(func() {
cancel()
})
connectionErrorChannel := make(chan error)
v.ReceiveBlocks(ctx, connectionErrorChannel)
require.Equal(t, slot, v.highestValidSlot)
}
func TestService_ReceiveBlocks_SetHighestDeneb(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := validatormock.NewMockValidatorClient(ctrl)
v := validator{
validatorClient: client,
blockFeed: new(event.Feed),
}
stream := mock2.NewMockBeaconNodeValidatorAltair_StreamBlocksClient(ctrl)
ctx, cancel := context.WithCancel(context.Background())
client.EXPECT().StreamBlocksAltair(
gomock.Any(),
&ethpb.StreamBlocksRequest{VerifiedOnly: true},
).Return(stream, nil)
stream.EXPECT().Context().Return(ctx).AnyTimes()
slot := primitives.Slot(100)
stream.EXPECT().Recv().Return(
&ethpb.StreamBlocksResponse{
Block: &ethpb.StreamBlocksResponse_DenebBlock{
DenebBlock: &ethpb.SignedBeaconBlockDeneb{Block: &ethpb.BeaconBlockDeneb{Slot: slot, Body: &ethpb.BeaconBlockBodyDeneb{}}}},
},
nil,
).Do(func() {
cancel()
})
connectionErrorChannel := make(chan error)
v.ReceiveBlocks(ctx, connectionErrorChannel)
require.Equal(t, slot, v.highestValidSlot)
v.ReceiveSlots(ctx, connectionErrorChannel)
require.Equal(t, primitives.Slot(123), v.highestValidSlot)
}
type doppelGangerRequestMatcher struct {