Use dependent root to request duties (#15142)

* Use dependent root to request duties

* Add actual roots to the event stream

* Add feature flag

* fix no go

* fix test

* add test

* log duties changes on reorgs

* send depdendent root on grpc response

* fix wrong return status in tests

* fix tests

* gazelle

* add unil wait for wg

* parse slot twice

* add slot deadline to update duties

* fix conflict

* add dependency

* lint

* Thanks James!

* fix segfault

* fix rpc tests

---------

Co-authored-by: terence tsao <terence@prysmaticlabs.com>
This commit is contained in:
Potuz
2025-04-11 16:22:57 -05:00
committed by GitHub
parent 5a527a15c8
commit bab898d1d3
35 changed files with 3948 additions and 3311 deletions

View File

@@ -51,6 +51,7 @@ type ForkchoiceFetcher interface {
ProposerBoost() [32]byte
RecentBlockSlot(root [32]byte) (primitives.Slot, error)
IsCanonical(ctx context.Context, blockRoot [32]byte) (bool, error)
DependentRoot(primitives.Epoch) ([32]byte, error)
}
// TimeFetcher retrieves the Ethereum consensus data that's related to time.

View File

@@ -448,6 +448,11 @@ func (s *ChainService) IsCanonical(_ context.Context, r [32]byte) (bool, error)
return true, nil
}
// DependentRoot mocks the base method in the chain service.
func (*ChainService) DependentRoot(_ primitives.Epoch) ([32]byte, error) {
return [32]byte{}, nil
}
// HasBlock mocks the same method in the chain service.
func (s *ChainService) HasBlock(ctx context.Context, rt [32]byte) bool {
if s.DB == nil {

View File

@@ -9,6 +9,7 @@ import (
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -51,7 +52,7 @@ func (vs *Server) StreamBlocksAltair(req *ethpb.StreamBlocksRequest, stream ethp
// Deprecated: gRPC API will still be supported for some time, most likely until v8 in 2026, but will be eventually removed in favor of REST API.
//
// StreamSlots sends a block's slot to clients every single time a block is received by the beacon node.
// StreamSlots sends a the block's slot and dependent roots to clients every single time a block is received by the beacon node.
func (vs *Server) StreamSlots(req *ethpb.StreamSlotsRequest, stream ethpb.BeaconNodeValidator_StreamSlotsServer) error {
ch := make(chan *feed.Event, 1)
var sub event.Subscription
@@ -85,7 +86,24 @@ func (vs *Server) StreamSlots(req *ethpb.StreamSlotsRequest, stream ethpb.Beacon
}
s = data.SignedBlock.Block().Slot()
}
if err := stream.Send(&ethpb.StreamSlotsResponse{Slot: s}); err != nil {
currEpoch := slots.ToEpoch(s)
currDepRoot, err := vs.ForkchoiceFetcher.DependentRoot(currEpoch)
if err != nil {
return status.Errorf(codes.Internal, "Could not get dependent root: %v", err)
}
prevDepRoot := currDepRoot
if currEpoch > 0 {
prevDepRoot, err = vs.ForkchoiceFetcher.DependentRoot(currEpoch - 1)
if err != nil {
return status.Errorf(codes.Internal, "Could not get dependent root: %v", err)
}
}
if err := stream.Send(
&ethpb.StreamSlotsResponse{
Slot: s,
PreviousDutyDependentRoot: prevDepRoot[:],
CurrentDutyDependentRoot: currDepRoot[:],
}); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
case <-sub.Err():

View File

@@ -297,15 +297,20 @@ func TestServer_StreamSlots_OnHeadUpdated(t *testing.T) {
chainService := &chainMock.ChainService{}
server := &Server{
Ctx: ctx,
BlockNotifier: chainService.BlockNotifier(),
Ctx: ctx,
ForkchoiceFetcher: chainService,
BlockNotifier: chainService.BlockNotifier(),
}
exitRoutine := make(chan bool)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStream := mock.NewMockBeaconNodeValidator_StreamSlotsServer(ctrl)
mockStream.EXPECT().Send(&ethpb.StreamSlotsResponse{Slot: 123}).Do(func(arg0 interface{}) {
mockStream.EXPECT().Send(&ethpb.StreamSlotsResponse{
Slot: 123,
PreviousDutyDependentRoot: params.BeaconConfig().ZeroHash[:],
CurrentDutyDependentRoot: params.BeaconConfig().ZeroHash[:],
}).Do(func(arg0 interface{}) {
exitRoutine <- true
})
mockStream.EXPECT().Context().Return(ctx).AnyTimes()
@@ -329,14 +334,19 @@ func TestServer_StreamSlotsVerified_OnHeadUpdated(t *testing.T) {
ctx := context.Background()
chainService := &chainMock.ChainService{}
server := &Server{
Ctx: ctx,
StateNotifier: chainService.StateNotifier(),
Ctx: ctx,
ForkchoiceFetcher: chainService,
StateNotifier: chainService.StateNotifier(),
}
exitRoutine := make(chan bool)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStream := mock.NewMockBeaconNodeValidator_StreamSlotsServer(ctrl)
mockStream.EXPECT().Send(&ethpb.StreamSlotsResponse{Slot: 123}).Do(func(arg0 interface{}) {
mockStream.EXPECT().Send(&ethpb.StreamSlotsResponse{
Slot: 123,
PreviousDutyDependentRoot: params.BeaconConfig().ZeroHash[:],
CurrentDutyDependentRoot: params.BeaconConfig().ZeroHash[:],
}).Do(func(arg0 interface{}) {
exitRoutine <- true
})
mockStream.EXPECT().Context().Return(ctx).AnyTimes()

View File

@@ -159,9 +159,22 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb.
validatorAssignments = append(validatorAssignments, assignment)
nextValidatorAssignments = append(nextValidatorAssignments, nextAssignment)
}
currDependentRoot, err := vs.ForkchoiceFetcher.DependentRoot(currentEpoch)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get dependent root: %v", err)
}
prevDependentRoot := currDependentRoot
if currDependentRoot != [32]byte{} && currentEpoch > 0 {
prevDependentRoot, err = vs.ForkchoiceFetcher.DependentRoot(currentEpoch - 1)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get previous dependent root: %v", err)
}
}
return &ethpb.DutiesResponse{
CurrentEpochDuties: validatorAssignments,
NextEpochDuties: nextValidatorAssignments,
PreviousDutyDependentRoot: prevDependentRoot[:],
CurrentDutyDependentRoot: currDependentRoot[:],
CurrentEpochDuties: validatorAssignments,
NextEpochDuties: nextValidatorAssignments,
}, nil
}

View File

@@ -55,10 +55,11 @@ func TestGetDuties_OK(t *testing.T) {
State: bs, Root: genesisRoot[:], Genesis: time.Now(),
}
vs := &Server{
HeadFetcher: chain,
TimeFetcher: chain,
SyncChecker: &mockSync.Sync{IsSyncing: false},
PayloadIDCache: cache.NewPayloadIDCache(),
HeadFetcher: chain,
TimeFetcher: chain,
ForkchoiceFetcher: chain,
SyncChecker: &mockSync.Sync{IsSyncing: false},
PayloadIDCache: cache.NewPayloadIDCache(),
}
// Test the first validator in registry.
@@ -140,11 +141,12 @@ func TestGetAltairDuties_SyncCommitteeOK(t *testing.T) {
State: bs, Root: genesisRoot[:], Genesis: time.Now().Add(time.Duration(-1*int64(slot-1)) * time.Second),
}
vs := &Server{
HeadFetcher: chain,
TimeFetcher: chain,
Eth1InfoFetcher: &mockExecution.Chain{},
SyncChecker: &mockSync.Sync{IsSyncing: false},
PayloadIDCache: cache.NewPayloadIDCache(),
HeadFetcher: chain,
TimeFetcher: chain,
ForkchoiceFetcher: chain,
Eth1InfoFetcher: &mockExecution.Chain{},
SyncChecker: &mockSync.Sync{IsSyncing: false},
PayloadIDCache: cache.NewPayloadIDCache(),
}
// Test the first validator in registry.
@@ -246,11 +248,12 @@ func TestGetBellatrixDuties_SyncCommitteeOK(t *testing.T) {
State: bs, Root: genesisRoot[:], Genesis: time.Now().Add(time.Duration(-1*int64(slot-1)) * time.Second),
}
vs := &Server{
HeadFetcher: chain,
TimeFetcher: chain,
Eth1InfoFetcher: &mockExecution.Chain{},
SyncChecker: &mockSync.Sync{IsSyncing: false},
PayloadIDCache: cache.NewPayloadIDCache(),
HeadFetcher: chain,
TimeFetcher: chain,
ForkchoiceFetcher: chain,
Eth1InfoFetcher: &mockExecution.Chain{},
SyncChecker: &mockSync.Sync{IsSyncing: false},
PayloadIDCache: cache.NewPayloadIDCache(),
}
// Test the first validator in registry.
@@ -338,12 +341,13 @@ func TestGetAltairDuties_UnknownPubkey(t *testing.T) {
require.NoError(t, err)
vs := &Server{
HeadFetcher: chain,
TimeFetcher: chain,
Eth1InfoFetcher: &mockExecution.Chain{},
SyncChecker: &mockSync.Sync{IsSyncing: false},
DepositFetcher: depositCache,
PayloadIDCache: cache.NewPayloadIDCache(),
HeadFetcher: chain,
ForkchoiceFetcher: chain,
TimeFetcher: chain,
Eth1InfoFetcher: &mockExecution.Chain{},
SyncChecker: &mockSync.Sync{IsSyncing: false},
DepositFetcher: depositCache,
PayloadIDCache: cache.NewPayloadIDCache(),
}
unknownPubkey := bytesutil.PadTo([]byte{'u'}, 48)
@@ -361,7 +365,8 @@ func TestGetDuties_SlotOutOfUpperBound(t *testing.T) {
Genesis: time.Now(),
}
vs := &Server{
TimeFetcher: chain,
ForkchoiceFetcher: chain,
TimeFetcher: chain,
}
req := &ethpb.DutiesRequest{
Epoch: primitives.Epoch(chain.CurrentSlot()/params.BeaconConfig().SlotsPerEpoch + 2),
@@ -396,10 +401,11 @@ func TestGetDuties_CurrentEpoch_ShouldNotFail(t *testing.T) {
State: bState, Root: genesisRoot[:], Genesis: time.Now(),
}
vs := &Server{
HeadFetcher: chain,
TimeFetcher: chain,
SyncChecker: &mockSync.Sync{IsSyncing: false},
PayloadIDCache: cache.NewPayloadIDCache(),
HeadFetcher: chain,
ForkchoiceFetcher: chain,
TimeFetcher: chain,
SyncChecker: &mockSync.Sync{IsSyncing: false},
PayloadIDCache: cache.NewPayloadIDCache(),
}
// Test the first validator in registry.
@@ -435,10 +441,11 @@ func TestGetDuties_MultipleKeys_OK(t *testing.T) {
State: bs, Root: genesisRoot[:], Genesis: time.Now(),
}
vs := &Server{
HeadFetcher: chain,
TimeFetcher: chain,
SyncChecker: &mockSync.Sync{IsSyncing: false},
PayloadIDCache: cache.NewPayloadIDCache(),
HeadFetcher: chain,
ForkchoiceFetcher: chain,
TimeFetcher: chain,
SyncChecker: &mockSync.Sync{IsSyncing: false},
PayloadIDCache: cache.NewPayloadIDCache(),
}
pubkey0 := deposits[0].Data.PublicKey