From b7e0819f00ede17545eb9c4a7c2651b5707aeddd Mon Sep 17 00:00:00 2001 From: Potuz Date: Fri, 22 Dec 2023 15:47:51 -0300 Subject: [PATCH] refactor Payload Id caches (#12987) * init - getLocalPayload does not use the proposer ID from the cache but takes it from the block - Fixed tests in blockchain package - Fixed tests in the RPC package - Fixed spectests EpochProposers takes 256 bytes that can be avoided to be copied, but this optimization is not clear to be worth it. assginmentStatus can be optimized to use the cached version from the TrackedValidatorsCache We shouldn't cache the proposer duties when calling getDuties but when we update the epoch boundary instead * track validators on prepare proposers * more rpc tests * more rpc tests * initialize grpc caches * Add back fcu log Also fix two existing bugs wrong parent hash on pre Capella and wrong blockhashes on altair * use beacon default fee recipient if there is none in the vc * fix validator test * radek's review * push always proposer settings even if no flag is specified in the VC * Only register with the builder if the VC flag is set Great find by @terencechain * add regression test * Radek's review * change signature of registration builder --- beacon-chain/blockchain/BUILD.bazel | 2 +- beacon-chain/blockchain/execution_engine.go | 64 ++++------ .../blockchain/execution_engine_test.go | 85 ++++++-------- .../blockchain/forkchoice_update_execution.go | 19 ++- .../forkchoice_update_execution_test.go | 24 ++-- beacon-chain/blockchain/options.go | 14 ++- beacon-chain/blockchain/process_block.go | 14 ++- beacon-chain/blockchain/process_block_test.go | 2 +- beacon-chain/blockchain/receive_block_test.go | 5 +- beacon-chain/blockchain/service.go | 5 +- beacon-chain/blockchain/service_test.go | 2 +- beacon-chain/blockchain/setup_test.go | 2 + beacon-chain/blockchain/tracked_proposer.go | 23 ++++ beacon-chain/cache/BUILD.bazel | 1 + beacon-chain/cache/payload_id.go | 111 +++++++----------- beacon-chain/cache/payload_id_test.go | 53 ++++----- .../cache/proposer_indices_disabled.go | 15 +++ beacon-chain/cache/tracked_validators.go | 43 +++++++ beacon-chain/core/helpers/beacon_committee.go | 2 +- beacon-chain/core/helpers/validators.go | 54 +++++++-- beacon-chain/node/node.go | 12 +- beacon-chain/rpc/eth/validator/BUILD.bazel | 3 - beacon-chain/rpc/eth/validator/handlers.go | 38 ++---- .../rpc/eth/validator/handlers_test.go | 88 +++++--------- beacon-chain/rpc/eth/validator/server.go | 3 +- .../prysm/v1alpha1/validator/assignments.go | 14 +-- .../v1alpha1/validator/assignments_test.go | 87 +++++++------- .../rpc/prysm/v1alpha1/validator/proposer.go | 57 +++++---- .../validator/proposer_bellatrix_test.go | 3 +- .../validator/proposer_execution_payload.go | 56 ++++----- .../proposer_execution_payload_test.go | 40 ++++--- .../prysm/v1alpha1/validator/proposer_test.go | 31 +++-- .../rpc/prysm/v1alpha1/validator/server.go | 3 +- beacon-chain/rpc/service.go | 9 +- consensus-types/primitives/BUILD.bazel | 2 + .../primitives/execution_address.go | 3 + consensus-types/primitives/payload_id.go | 4 + .../shared/common/forkchoice/service.go | 3 +- validator/client/BUILD.bazel | 1 - validator/client/runner.go | 26 ++-- validator/client/validator.go | 36 ++---- validator/client/validator_test.go | 27 +++-- 42 files changed, 533 insertions(+), 553 deletions(-) create mode 100644 beacon-chain/blockchain/tracked_proposer.go create mode 100644 beacon-chain/cache/tracked_validators.go create mode 100644 consensus-types/primitives/execution_address.go create mode 100644 consensus-types/primitives/payload_id.go diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index 858158c4f5..561d4fd699 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "receive_blob.go", "receive_block.go", "service.go", + "tracked_proposer.go", "weak_subjectivity_checks.go", ], importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain", @@ -52,7 +53,6 @@ go_library( "//beacon-chain/db:go_default_library", "//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/db/filters:go_default_library", - "//beacon-chain/db/kv:go_default_library", "//beacon-chain/execution:go_default_library", "//beacon-chain/forkchoice:go_default_library", "//beacon-chain/forkchoice/doubly-linked-tree:go_default_library", diff --git a/beacon-chain/blockchain/execution_engine.go b/beacon-chain/blockchain/execution_engine.go index 265245a635..7291ed12e9 100644 --- a/beacon-chain/blockchain/execution_engine.go +++ b/beacon-chain/blockchain/execution_engine.go @@ -11,7 +11,6 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" - "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/kv" "github.com/prysmaticlabs/prysm/v4/beacon-chain/execution" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" "github.com/prysmaticlabs/prysm/v4/config/features" @@ -74,7 +73,7 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *notifyForkcho } nextSlot := s.CurrentSlot() + 1 // Cache payload ID for next slot proposer. - hasAttr, attr, proposerId := s.getPayloadAttribute(ctx, arg.headState, nextSlot, arg.headRoot[:]) + hasAttr, attr := s.getPayloadAttribute(ctx, arg.headState, nextSlot, arg.headRoot[:]) payloadID, lastValidHash, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, attr) if err != nil { @@ -153,7 +152,7 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *notifyForkcho log.WithError(err).Error("Could not set head root to valid") return nil, nil } - // If the forkchoice update call has an attribute, update the proposer payload ID cache. + // If the forkchoice update call has an attribute, update the payload ID cache. if hasAttr && payloadID != nil { var pId [8]byte copy(pId[:], payloadID[:]) @@ -162,7 +161,7 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *notifyForkcho "headSlot": headBlk.Slot(), "payloadID": fmt.Sprintf("%#x", bytesutil.Trunc(payloadID[:])), }).Info("Forkchoice updated with payload attributes for proposal") - s.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(nextSlot, proposerId, pId, arg.headRoot) + s.cfg.PayloadIDCache.Set(nextSlot, arg.headRoot, pId) } else if hasAttr && payloadID == nil && !features.Get().PrepareAllPayloads { log.WithFields(logrus.Fields{ "blockHash": fmt.Sprintf("%#x", headPayload.BlockHash()), @@ -277,12 +276,12 @@ func (s *Service) pruneInvalidBlock(ctx context.Context, root, parentRoot, lvh [ // getPayloadAttributes returns the payload attributes for the given state and slot. // The attribute is required to initiate a payload build process in the context of an `engine_forkchoiceUpdated` call. -func (s *Service) getPayloadAttribute(ctx context.Context, st state.BeaconState, slot primitives.Slot, headRoot []byte) (bool, payloadattribute.Attributer, primitives.ValidatorIndex) { +func (s *Service) getPayloadAttribute(ctx context.Context, st state.BeaconState, slot primitives.Slot, headRoot []byte) (bool, payloadattribute.Attributer) { emptyAttri := payloadattribute.EmptyWithVersion(st.Version()) - // Root is `[32]byte{}` since we are retrieving proposer ID of a given slot. During insertion at assignment the root was not known. - proposerID, _, ok := s.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(slot, [32]byte{} /* root */) - if !ok && !features.Get().PrepareAllPayloads { // There's no need to build attribute if there is no proposer for slot. - return false, emptyAttri, 0 + + val, ok := s.trackedProposer(st, slot) + if !ok && !features.Get().PrepareAllPayloads { + return false, emptyAttri } // Get previous randao. @@ -292,41 +291,20 @@ func (s *Service) getPayloadAttribute(ctx context.Context, st state.BeaconState, st, err = transition.ProcessSlotsUsingNextSlotCache(ctx, st, headRoot, slot) if err != nil { log.WithError(err).Error("Could not process slots to get payload attribute") - return false, emptyAttri, 0 + return false, emptyAttri } } prevRando, err := helpers.RandaoMix(st, time.CurrentEpoch(st)) if err != nil { log.WithError(err).Error("Could not get randao mix to get payload attribute") - return false, emptyAttri, 0 - } - - // Get fee recipient. - feeRecipient := params.BeaconConfig().DefaultFeeRecipient - recipient, err := s.cfg.BeaconDB.FeeRecipientByValidatorID(ctx, proposerID) - switch { - case errors.Is(err, kv.ErrNotFoundFeeRecipient): - if feeRecipient.String() == params.BeaconConfig().EthBurnAddressHex { - logrus.WithFields(logrus.Fields{ - "validatorIndex": proposerID, - "burnAddress": params.BeaconConfig().EthBurnAddressHex, - }).Warn("Fee recipient is currently using the burn address, " + - "you will not be rewarded transaction fees on this setting. " + - "Please set a different eth address as the fee recipient. " + - "Please refer to our documentation for instructions") - } - case err != nil: - log.WithError(err).Error("Could not get fee recipient to get payload attribute") - return false, emptyAttri, 0 - default: - feeRecipient = recipient + return false, emptyAttri } // Get timestamp. t, err := slots.ToTime(uint64(s.genesisTime.Unix()), slot) if err != nil { log.WithError(err).Error("Could not get timestamp to get payload attribute") - return false, emptyAttri, 0 + return false, emptyAttri } var attr payloadattribute.Attributer @@ -335,51 +313,51 @@ func (s *Service) getPayloadAttribute(ctx context.Context, st state.BeaconState, withdrawals, err := st.ExpectedWithdrawals() if err != nil { log.WithError(err).Error("Could not get expected withdrawals to get payload attribute") - return false, emptyAttri, 0 + return false, emptyAttri } attr, err = payloadattribute.New(&enginev1.PayloadAttributesV3{ Timestamp: uint64(t.Unix()), PrevRandao: prevRando, - SuggestedFeeRecipient: feeRecipient.Bytes(), + SuggestedFeeRecipient: val.FeeRecipient[:], Withdrawals: withdrawals, ParentBeaconBlockRoot: headRoot, }) if err != nil { log.WithError(err).Error("Could not get payload attribute") - return false, emptyAttri, 0 + return false, emptyAttri } case version.Capella: withdrawals, err := st.ExpectedWithdrawals() if err != nil { log.WithError(err).Error("Could not get expected withdrawals to get payload attribute") - return false, emptyAttri, 0 + return false, emptyAttri } attr, err = payloadattribute.New(&enginev1.PayloadAttributesV2{ Timestamp: uint64(t.Unix()), PrevRandao: prevRando, - SuggestedFeeRecipient: feeRecipient.Bytes(), + SuggestedFeeRecipient: val.FeeRecipient[:], Withdrawals: withdrawals, }) if err != nil { log.WithError(err).Error("Could not get payload attribute") - return false, emptyAttri, 0 + return false, emptyAttri } case version.Bellatrix: attr, err = payloadattribute.New(&enginev1.PayloadAttributes{ Timestamp: uint64(t.Unix()), PrevRandao: prevRando, - SuggestedFeeRecipient: feeRecipient.Bytes(), + SuggestedFeeRecipient: val.FeeRecipient[:], }) if err != nil { log.WithError(err).Error("Could not get payload attribute") - return false, emptyAttri, 0 + return false, emptyAttri } default: log.WithField("version", st.Version()).Error("Could not get payload attribute due to unknown state version") - return false, emptyAttri, 0 + return false, emptyAttri } - return true, attr, proposerID + return true, attr } // removeInvalidBlockAndState removes the invalid block, blob and its corresponding state from the cache and DB. diff --git a/beacon-chain/blockchain/execution_engine_test.go b/beacon-chain/blockchain/execution_engine_test.go index 9a4bc98586..5a54008cfc 100644 --- a/beacon-chain/blockchain/execution_engine_test.go +++ b/beacon-chain/blockchain/execution_engine_test.go @@ -26,11 +26,10 @@ import ( "github.com/prysmaticlabs/prysm/v4/testing/assert" "github.com/prysmaticlabs/prysm/v4/testing/require" "github.com/prysmaticlabs/prysm/v4/testing/util" - logTest "github.com/sirupsen/logrus/hooks/test" ) func Test_NotifyForkchoiceUpdate_GetPayloadAttrErrorCanContinue(t *testing.T) { - service, tr := minimalTestService(t, WithProposerIdsCache(cache.NewProposerPayloadIDsCache())) + service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache())) ctx, beaconDB, fcs := tr.ctx, tr.db, tr.fcs altairBlk := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlockAltair()) @@ -79,14 +78,14 @@ func Test_NotifyForkchoiceUpdate_GetPayloadAttrErrorCanContinue(t *testing.T) { headBlock: b, } - service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(1, 0, [8]byte{}, [32]byte{}) + service.cfg.PayloadIDCache.Set(1, [32]byte{}, [8]byte{}) got, err := service.notifyForkchoiceUpdate(ctx, arg) require.NoError(t, err) require.DeepEqual(t, got, pid) // We still get a payload ID even though the state is bad. This means it returns until the end. } func Test_NotifyForkchoiceUpdate(t *testing.T) { - service, tr := minimalTestService(t, WithProposerIdsCache(cache.NewProposerPayloadIDsCache())) + service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache())) ctx, beaconDB, fcs := tr.ctx, tr.db, tr.fcs altairBlk := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlockAltair()) @@ -246,7 +245,7 @@ func Test_NotifyForkchoiceUpdate(t *testing.T) { } func Test_NotifyForkchoiceUpdate_NIlLVH(t *testing.T) { - service, tr := minimalTestService(t, WithProposerIdsCache(cache.NewProposerPayloadIDsCache())) + service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache())) ctx, beaconDB, fcs := tr.ctx, tr.db, tr.fcs // Prepare blocks @@ -334,7 +333,7 @@ func Test_NotifyForkchoiceUpdate_NIlLVH(t *testing.T) { // 3. the blockchain package calls fcu to obtain heads G -> F -> D. func Test_NotifyForkchoiceUpdateRecursive_DoublyLinkedTree(t *testing.T) { - service, tr := minimalTestService(t, WithProposerIdsCache(cache.NewProposerPayloadIDsCache())) + service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache())) ctx, beaconDB, fcs := tr.ctx, tr.db, tr.fcs // Prepare blocks @@ -467,7 +466,7 @@ func Test_NotifyNewPayload(t *testing.T) { cfg := params.BeaconConfig() cfg.TerminalTotalDifficulty = "2" params.OverrideBeaconConfig(cfg) - service, tr := minimalTestService(t, WithProposerIdsCache(cache.NewProposerPayloadIDsCache())) + service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache())) ctx, fcs := tr.ctx, tr.fcs phase0State, _ := util.DeterministicGenesisState(t, 1) @@ -709,7 +708,7 @@ func Test_NotifyNewPayload_SetOptimisticToValid(t *testing.T) { cfg.TerminalTotalDifficulty = "2" params.OverrideBeaconConfig(cfg) - service, tr := minimalTestService(t, WithProposerIdsCache(cache.NewProposerPayloadIDsCache())) + service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache())) ctx := tr.ctx bellatrixState, _ := util.DeterministicGenesisStateBellatrix(t, 2) @@ -777,83 +776,70 @@ func Test_reportInvalidBlock(t *testing.T) { } func Test_GetPayloadAttribute(t *testing.T) { - service, tr := minimalTestService(t, WithProposerIdsCache(cache.NewProposerPayloadIDsCache())) + service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache())) ctx := tr.ctx st, _ := util.DeterministicGenesisStateBellatrix(t, 1) - hasPayload, _, vId := service.getPayloadAttribute(ctx, st, 0, []byte{}) + hasPayload, _ := service.getPayloadAttribute(ctx, st, 0, []byte{}) require.Equal(t, false, hasPayload) - require.Equal(t, primitives.ValidatorIndex(0), vId) + service.cfg.TrackedValidatorsCache.Set(cache.TrackedValidator{Active: true, Index: 0}) // Cache hit, advance state, no fee recipient - suggestedVid := primitives.ValidatorIndex(1) slot := primitives.Slot(1) - service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, suggestedVid, [8]byte{}, [32]byte{}) - hook := logTest.NewGlobal() - hasPayload, attr, vId := service.getPayloadAttribute(ctx, st, slot, params.BeaconConfig().ZeroHash[:]) + service.cfg.PayloadIDCache.Set(slot, [32]byte{}, [8]byte{}) + hasPayload, attr := service.getPayloadAttribute(ctx, st, slot, params.BeaconConfig().ZeroHash[:]) require.Equal(t, true, hasPayload) - require.Equal(t, suggestedVid, vId) require.Equal(t, params.BeaconConfig().EthBurnAddressHex, common.BytesToAddress(attr.SuggestedFeeRecipient()).String()) - require.LogsContain(t, hook, "Fee recipient is currently using the burn address") // Cache hit, advance state, has fee recipient suggestedAddr := common.HexToAddress("123") - require.NoError(t, service.cfg.BeaconDB.SaveFeeRecipientsByValidatorIDs(ctx, []primitives.ValidatorIndex{suggestedVid}, []common.Address{suggestedAddr})) - service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, suggestedVid, [8]byte{}, [32]byte{}) - hasPayload, attr, vId = service.getPayloadAttribute(ctx, st, slot, params.BeaconConfig().ZeroHash[:]) + service.cfg.TrackedValidatorsCache.Set(cache.TrackedValidator{Active: true, FeeRecipient: primitives.ExecutionAddress(suggestedAddr), Index: 0}) + service.cfg.PayloadIDCache.Set(slot, [32]byte{}, [8]byte{}) + hasPayload, attr = service.getPayloadAttribute(ctx, st, slot, params.BeaconConfig().ZeroHash[:]) require.Equal(t, true, hasPayload) - require.Equal(t, suggestedVid, vId) require.Equal(t, suggestedAddr, common.BytesToAddress(attr.SuggestedFeeRecipient())) } func Test_GetPayloadAttribute_PrepareAllPayloads(t *testing.T) { - hook := logTest.NewGlobal() resetCfg := features.InitWithReset(&features.Flags{ PrepareAllPayloads: true, }) defer resetCfg() - service, tr := minimalTestService(t, WithProposerIdsCache(cache.NewProposerPayloadIDsCache())) + service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache())) ctx := tr.ctx st, _ := util.DeterministicGenesisStateBellatrix(t, 1) - hasPayload, attr, vId := service.getPayloadAttribute(ctx, st, 0, []byte{}) + hasPayload, attr := service.getPayloadAttribute(ctx, st, 0, []byte{}) require.Equal(t, true, hasPayload) - require.Equal(t, primitives.ValidatorIndex(0), vId) require.Equal(t, params.BeaconConfig().EthBurnAddressHex, common.BytesToAddress(attr.SuggestedFeeRecipient()).String()) - require.LogsContain(t, hook, "Fee recipient is currently using the burn address") } func Test_GetPayloadAttributeV2(t *testing.T) { - service, tr := minimalTestService(t, WithProposerIdsCache(cache.NewProposerPayloadIDsCache())) + service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache())) ctx := tr.ctx st, _ := util.DeterministicGenesisStateCapella(t, 1) - hasPayload, _, vId := service.getPayloadAttribute(ctx, st, 0, []byte{}) + hasPayload, _ := service.getPayloadAttribute(ctx, st, 0, []byte{}) require.Equal(t, false, hasPayload) - require.Equal(t, primitives.ValidatorIndex(0), vId) // Cache hit, advance state, no fee recipient - suggestedVid := primitives.ValidatorIndex(1) + service.cfg.TrackedValidatorsCache.Set(cache.TrackedValidator{Active: true, Index: 0}) slot := primitives.Slot(1) - service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, suggestedVid, [8]byte{}, [32]byte{}) - hook := logTest.NewGlobal() - hasPayload, attr, vId := service.getPayloadAttribute(ctx, st, slot, params.BeaconConfig().ZeroHash[:]) + service.cfg.PayloadIDCache.Set(slot, [32]byte{}, [8]byte{}) + hasPayload, attr := service.getPayloadAttribute(ctx, st, slot, params.BeaconConfig().ZeroHash[:]) require.Equal(t, true, hasPayload) - require.Equal(t, suggestedVid, vId) require.Equal(t, params.BeaconConfig().EthBurnAddressHex, common.BytesToAddress(attr.SuggestedFeeRecipient()).String()) - require.LogsContain(t, hook, "Fee recipient is currently using the burn address") a, err := attr.Withdrawals() require.NoError(t, err) require.Equal(t, 0, len(a)) // Cache hit, advance state, has fee recipient suggestedAddr := common.HexToAddress("123") - require.NoError(t, service.cfg.BeaconDB.SaveFeeRecipientsByValidatorIDs(ctx, []primitives.ValidatorIndex{suggestedVid}, []common.Address{suggestedAddr})) - service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, suggestedVid, [8]byte{}, [32]byte{}) - hasPayload, attr, vId = service.getPayloadAttribute(ctx, st, slot, params.BeaconConfig().ZeroHash[:]) + service.cfg.TrackedValidatorsCache.Set(cache.TrackedValidator{Active: true, FeeRecipient: primitives.ExecutionAddress(suggestedAddr), Index: 0}) + service.cfg.PayloadIDCache.Set(slot, [32]byte{}, [8]byte{}) + hasPayload, attr = service.getPayloadAttribute(ctx, st, slot, params.BeaconConfig().ZeroHash[:]) require.Equal(t, true, hasPayload) - require.Equal(t, suggestedVid, vId) require.Equal(t, suggestedAddr, common.BytesToAddress(attr.SuggestedFeeRecipient())) a, err = attr.Withdrawals() require.NoError(t, err) @@ -861,35 +847,30 @@ func Test_GetPayloadAttributeV2(t *testing.T) { } func Test_GetPayloadAttributeDeneb(t *testing.T) { - service, tr := minimalTestService(t, WithProposerIdsCache(cache.NewProposerPayloadIDsCache())) + service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache())) ctx := tr.ctx st, _ := util.DeterministicGenesisStateDeneb(t, 1) - hasPayload, _, vId := service.getPayloadAttribute(ctx, st, 0, []byte{}) + hasPayload, _ := service.getPayloadAttribute(ctx, st, 0, []byte{}) require.Equal(t, false, hasPayload) - require.Equal(t, primitives.ValidatorIndex(0), vId) // Cache hit, advance state, no fee recipient - suggestedVid := primitives.ValidatorIndex(1) slot := primitives.Slot(1) - service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, suggestedVid, [8]byte{}, [32]byte{}) - hook := logTest.NewGlobal() - hasPayload, attr, vId := service.getPayloadAttribute(ctx, st, slot, params.BeaconConfig().ZeroHash[:]) + service.cfg.TrackedValidatorsCache.Set(cache.TrackedValidator{Active: true, Index: 0}) + service.cfg.PayloadIDCache.Set(slot, [32]byte{}, [8]byte{}) + hasPayload, attr := service.getPayloadAttribute(ctx, st, slot, params.BeaconConfig().ZeroHash[:]) require.Equal(t, true, hasPayload) - require.Equal(t, suggestedVid, vId) require.Equal(t, params.BeaconConfig().EthBurnAddressHex, common.BytesToAddress(attr.SuggestedFeeRecipient()).String()) - require.LogsContain(t, hook, "Fee recipient is currently using the burn address") a, err := attr.Withdrawals() require.NoError(t, err) require.Equal(t, 0, len(a)) // Cache hit, advance state, has fee recipient suggestedAddr := common.HexToAddress("123") - require.NoError(t, service.cfg.BeaconDB.SaveFeeRecipientsByValidatorIDs(ctx, []primitives.ValidatorIndex{suggestedVid}, []common.Address{suggestedAddr})) - service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, suggestedVid, [8]byte{}, [32]byte{}) - hasPayload, attr, vId = service.getPayloadAttribute(ctx, st, slot, params.BeaconConfig().ZeroHash[:]) + service.cfg.TrackedValidatorsCache.Set(cache.TrackedValidator{Active: true, FeeRecipient: primitives.ExecutionAddress(suggestedAddr), Index: 0}) + service.cfg.PayloadIDCache.Set(slot, [32]byte{}, [8]byte{}) + hasPayload, attr = service.getPayloadAttribute(ctx, st, slot, params.BeaconConfig().ZeroHash[:]) require.Equal(t, true, hasPayload) - require.Equal(t, suggestedVid, vId) require.Equal(t, suggestedAddr, common.BytesToAddress(attr.SuggestedFeeRecipient())) a, err = attr.Withdrawals() require.NoError(t, err) diff --git a/beacon-chain/blockchain/forkchoice_update_execution.go b/beacon-chain/blockchain/forkchoice_update_execution.go index fd4e85d83b..1efbf94a76 100644 --- a/beacon-chain/blockchain/forkchoice_update_execution.go +++ b/beacon-chain/blockchain/forkchoice_update_execution.go @@ -17,11 +17,6 @@ import ( "go.opencensus.io/trace" ) -func (s *Service) isNewProposer(slot primitives.Slot) bool { - _, _, ok := s.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(slot, [32]byte{} /* root */) - return ok || features.Get().PrepareAllPayloads -} - func (s *Service) isNewHead(r [32]byte) bool { s.headLock.RLock() defer s.headLock.RUnlock() @@ -61,18 +56,20 @@ func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, newHeadRoot if !isNewHead { return false, nil } - isNewProposer := s.isNewProposer(proposingSlot) - if isNewProposer && !features.Get().DisableReorgLateBlocks { - if s.shouldOverrideFCU(newHeadRoot, proposingSlot) { - return false, nil - } - } + headState, headBlock, err := s.getStateAndBlock(ctx, newHeadRoot) if err != nil { log.WithError(err).Error("Could not get forkchoice update argument") return false, nil } + _, tracked := s.trackedProposer(headState, proposingSlot) + if (tracked || features.Get().PrepareAllPayloads) && !features.Get().DisableReorgLateBlocks { + if s.shouldOverrideFCU(newHeadRoot, proposingSlot) { + return false, nil + } + } + _, err = s.notifyForkchoiceUpdate(ctx, ¬ifyForkchoiceUpdateArg{ headState: headState, headRoot: newHeadRoot, diff --git a/beacon-chain/blockchain/forkchoice_update_execution_test.go b/beacon-chain/blockchain/forkchoice_update_execution_test.go index 95e7ad0dca..36ef8a13e1 100644 --- a/beacon-chain/blockchain/forkchoice_update_execution_test.go +++ b/beacon-chain/blockchain/forkchoice_update_execution_test.go @@ -17,15 +17,6 @@ import ( logTest "github.com/sirupsen/logrus/hooks/test" ) -func TestService_isNewProposer(t *testing.T) { - beaconDB := testDB.SetupDB(t) - service := setupBeaconChain(t, beaconDB) - require.Equal(t, false, service.isNewProposer(service.CurrentSlot()+1)) - - service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(service.CurrentSlot()+1, 0, [8]byte{}, [32]byte{} /* root */) - require.Equal(t, true, service.isNewProposer(service.CurrentSlot()+1)) -} - func TestService_isNewHead(t *testing.T) { beaconDB := testDB.SetupDB(t) service := setupBeaconChain(t, beaconDB) @@ -73,7 +64,7 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) { service, err := NewService(ctx, opts...) require.NoError(t, err) - service.cfg.ProposerSlotIndexCache = cache.NewProposerPayloadIDsCache() + service.cfg.PayloadIDCache = cache.NewPayloadIDCache() _, err = service.forkchoiceUpdateWithExecution(ctx, service.headRoot(), service.CurrentSlot()+1) require.NoError(t, err) hookErr := "could not notify forkchoice update" @@ -107,7 +98,7 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) { block: wsb, state: st, } - service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(2, 1, [8]byte{1}, [32]byte{2}) + service.cfg.PayloadIDCache.Set(2, [32]byte{2}, [8]byte{1}) _, err = service.forkchoiceUpdateWithExecution(ctx, r1, service.CurrentSlot()) require.NoError(t, err) require.LogsDoNotContain(t, hook, invalidStateErr) @@ -125,15 +116,14 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) { block: wsb, state: st, } - service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(2, 1, [8]byte{1}, [32]byte{2}) + service.cfg.PayloadIDCache.Set(2, [32]byte{2}, [8]byte{1}) _, err = service.forkchoiceUpdateWithExecution(ctx, r1, service.CurrentSlot()+1) require.NoError(t, err) require.LogsDoNotContain(t, hook, invalidStateErr) require.LogsDoNotContain(t, hook, hookErr) - vId, payloadID, has := service.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(2, [32]byte{2}) + payloadID, has := service.cfg.PayloadIDCache.PayloadID(2, [32]byte{2}) require.Equal(t, true, has) - require.Equal(t, primitives.ValidatorIndex(1), vId) - require.Equal(t, [8]byte{1}, payloadID) + require.Equal(t, primitives.PayloadID{1}, payloadID) // Test zero headRoot returns immediately. headRoot := service.headRoot() @@ -143,7 +133,7 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) { } func TestService_forkchoiceUpdateWithExecution_SameHeadRootNewProposer(t *testing.T) { - service, tr := minimalTestService(t) + service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache())) ctx, beaconDB, fcs := tr.ctx, tr.db, tr.fcs altairBlk := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlockAltair()) @@ -182,7 +172,7 @@ func TestService_forkchoiceUpdateWithExecution_SameHeadRootNewProposer(t *testin service.head.root = r service.head.block = sb service.head.state = st - service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(service.CurrentSlot()+1, 0, [8]byte{}, [32]byte{} /* root */) + service.cfg.PayloadIDCache.Set(service.CurrentSlot()+1, [32]byte{} /* root */, [8]byte{}) _, err = service.forkchoiceUpdateWithExecution(ctx, r, service.CurrentSlot()+1) require.NoError(t, err) diff --git a/beacon-chain/blockchain/options.go b/beacon-chain/blockchain/options.go index c8333b40e8..72110b3c75 100644 --- a/beacon-chain/blockchain/options.go +++ b/beacon-chain/blockchain/options.go @@ -69,10 +69,18 @@ func WithDepositCache(c cache.DepositCache) Option { } } -// WithProposerIdsCache for proposer id cache. -func WithProposerIdsCache(c *cache.ProposerPayloadIDsCache) Option { +// WithPayloadIDCache for payload ID cache. +func WithPayloadIDCache(c *cache.PayloadIDCache) Option { return func(s *Service) error { - s.cfg.ProposerSlotIndexCache = c + s.cfg.PayloadIDCache = c + return nil + } +} + +// WithTrackedValidatorsCache for tracked validators cache. +func WithTrackedValidatorsCache(c *cache.TrackedValidatorsCache) Option { + return func(s *Service) error { + s.cfg.TrackedValidatorsCache = c return nil } } diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 27162bc3f0..d02ba41cd4 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -674,13 +674,17 @@ func (s *Service) lateBlockTasks(ctx context.Context) { log.WithError(err).Error("lateBlockTasks: could not update epoch boundary caches") } s.cfg.ForkChoiceStore.RUnlock() - // Head root should be empty when retrieving proposer index for the next slot. - _, id, has := s.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(s.CurrentSlot()+1, [32]byte{} /* head root */) - // There exists proposer for next slot, but we haven't called fcu w/ payload attribute yet. - if (!has && !features.Get().PrepareAllPayloads) || id != [8]byte{} { + _, tracked := s.trackedProposer(headState, s.CurrentSlot()+1) + // return early if we are not proposing next slot. + if !tracked && !features.Get().PrepareAllPayloads { + return + } + // return early if we already started building a block for the current + // head root + _, has := s.cfg.PayloadIDCache.PayloadID(s.CurrentSlot()+1, headRoot) + if has { return } - s.headLock.RLock() headBlock, err := s.headBlock() if err != nil { diff --git a/beacon-chain/blockchain/process_block_test.go b/beacon-chain/blockchain/process_block_test.go index 95c164b2cb..f313d50105 100644 --- a/beacon-chain/blockchain/process_block_test.go +++ b/beacon-chain/blockchain/process_block_test.go @@ -895,7 +895,7 @@ func Test_validateMergeTransitionBlock(t *testing.T) { cfg.TerminalBlockHash = params.BeaconConfig().ZeroHash params.OverrideBeaconConfig(cfg) - service, tr := minimalTestService(t, WithProposerIdsCache(cache.NewProposerPayloadIDsCache())) + service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache())) ctx := tr.ctx aHash := common.BytesToHash([]byte("a")) diff --git a/beacon-chain/blockchain/receive_block_test.go b/beacon-chain/blockchain/receive_block_test.go index 8576e5ea76..5f13536224 100644 --- a/beacon-chain/blockchain/receive_block_test.go +++ b/beacon-chain/blockchain/receive_block_test.go @@ -7,6 +7,7 @@ import ( "time" blockchainTesting "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/cache" "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/voluntaryexits" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" @@ -130,7 +131,9 @@ func TestService_ReceiveBlock(t *testing.T) { s, tr := minimalTestService(t, WithFinalizedStateAtStartUp(genesis), WithExitPool(voluntaryexits.NewPool()), - WithStateNotifier(&blockchainTesting.MockStateNotifier{RecordEvents: true})) + WithStateNotifier(&blockchainTesting.MockStateNotifier{RecordEvents: true}), + WithTrackedValidatorsCache(cache.NewTrackedValidatorsCache()), + ) beaconDB := tr.db genesisBlockRoot := bytesutil.ToBytes32(nil) diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index e3ee4dd540..33ada77306 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -73,7 +73,8 @@ type config struct { ChainStartFetcher execution.ChainStartFetcher BeaconDB db.HeadAccessDatabase DepositCache cache.DepositCache - ProposerSlotIndexCache *cache.ProposerPayloadIDsCache + PayloadIDCache *cache.PayloadIDCache + TrackedValidatorsCache *cache.TrackedValidatorsCache AttPool attestations.Pool ExitPool voluntaryexits.PoolManager SlashingPool slashings.PoolManager @@ -167,7 +168,7 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) { checkpointStateCache: cache.NewCheckpointStateCache(), initSyncBlocks: make(map[[32]byte]interfaces.ReadOnlySignedBeaconBlock), blobNotifiers: bn, - cfg: &config{ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache()}, + cfg: &config{}, blockBeingSynced: ¤tlySyncingBlock{roots: make(map[[32]byte]struct{})}, } for _, opt := range opts { diff --git a/beacon-chain/blockchain/service_test.go b/beacon-chain/blockchain/service_test.go index 99b6498177..87d104cddb 100644 --- a/beacon-chain/blockchain/service_test.go +++ b/beacon-chain/blockchain/service_test.go @@ -100,7 +100,7 @@ func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service { WithForkChoiceStore(fc), WithAttestationService(attService), WithStateGen(stateGen), - WithProposerIdsCache(cache.NewProposerPayloadIDsCache()), + WithPayloadIDCache(cache.NewPayloadIDCache()), WithClockSynchronizer(startup.NewClockSynchronizer()), } diff --git a/beacon-chain/blockchain/setup_test.go b/beacon-chain/blockchain/setup_test.go index bb771f17c6..bbdae002e8 100644 --- a/beacon-chain/blockchain/setup_test.go +++ b/beacon-chain/blockchain/setup_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/prysmaticlabs/prysm/v4/async/event" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/cache" "github.com/prysmaticlabs/prysm/v4/beacon-chain/cache/depositcache" statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/v4/beacon-chain/db" @@ -114,6 +115,7 @@ func minimalTestService(t *testing.T, opts ...Option) (*Service, *testServiceReq WithAttestationService(req.attSrv), WithBLSToExecPool(req.blsPool), WithDepositCache(dc), + WithTrackedValidatorsCache(cache.NewTrackedValidatorsCache()), } // append the variadic opts so they override the defaults by being processed afterwards opts = append(defOpts, opts...) diff --git a/beacon-chain/blockchain/tracked_proposer.go b/beacon-chain/blockchain/tracked_proposer.go new file mode 100644 index 0000000000..c411c5e030 --- /dev/null +++ b/beacon-chain/blockchain/tracked_proposer.go @@ -0,0 +1,23 @@ +package blockchain + +import ( + "github.com/prysmaticlabs/prysm/v4/beacon-chain/cache" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" +) + +// trackedProposer returns whether the beacon node was informed, via the +// validators/prepare_proposer endpoint, of the proposer at the given slot. +// It only returns true if the tracked proposer is present and active. +func (s *Service) trackedProposer(st state.ReadOnlyBeaconState, slot primitives.Slot) (cache.TrackedValidator, bool) { + id, err := helpers.BeaconProposerIndexAtSlot(s.ctx, st, slot) + if err != nil { + return cache.TrackedValidator{}, false + } + val, ok := s.cfg.TrackedValidatorsCache.Validator(id) + if !ok { + return cache.TrackedValidator{}, false + } + return val, val.Active +} diff --git a/beacon-chain/cache/BUILD.bazel b/beacon-chain/cache/BUILD.bazel index 39819bce66..a6a5d29132 100644 --- a/beacon-chain/cache/BUILD.bazel +++ b/beacon-chain/cache/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "sync_committee_disabled.go", # keep "sync_committee_head_state.go", "sync_subnet_ids.go", + "tracked_validators.go", ], importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/cache", visibility = [ diff --git a/beacon-chain/cache/payload_id.go b/beacon-chain/cache/payload_id.go index 69d7ff6ee0..98f108f499 100644 --- a/beacon-chain/cache/payload_id.go +++ b/beacon-chain/cache/payload_id.go @@ -1,94 +1,63 @@ package cache import ( - "bytes" "sync" - fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" - "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" ) -const keyLength = 40 -const vIdLength = 8 -const pIdLength = 8 -const vpIdsLength = vIdLength + pIdLength +// RootToPayloadIDMap is a map with keys the head root and values the +// corresponding PayloadID +type RootToPayloadIDMap map[[32]byte]primitives.PayloadID -// ProposerPayloadIDsCache is a cache of proposer payload IDs. -// The key is the concatenation of the slot and the block root. -// The value is the concatenation of the proposer and payload IDs, 8 bytes each. -type ProposerPayloadIDsCache struct { - slotToProposerAndPayloadIDs map[[keyLength]byte][vpIdsLength]byte - sync.RWMutex +// PayloadIDCache is a cache that keeps track of the prepared payload ID for the +// given slot and with the given head root. +type PayloadIDCache struct { + slotToPayloadID map[primitives.Slot]RootToPayloadIDMap + sync.Mutex } -// NewProposerPayloadIDsCache creates a new proposer payload IDs cache. -func NewProposerPayloadIDsCache() *ProposerPayloadIDsCache { - return &ProposerPayloadIDsCache{ - slotToProposerAndPayloadIDs: make(map[[keyLength]byte][vpIdsLength]byte), - } +// NewPayloadIDCache returns a new payload ID cache +func NewPayloadIDCache() *PayloadIDCache { + return &PayloadIDCache{slotToPayloadID: make(map[primitives.Slot]RootToPayloadIDMap)} } -// GetProposerPayloadIDs returns the proposer and payload IDs for the given slot and head root to build the block. -func (f *ProposerPayloadIDsCache) GetProposerPayloadIDs( - slot primitives.Slot, - r [fieldparams.RootLength]byte, -) (primitives.ValidatorIndex, [pIdLength]byte, bool) { - f.RLock() - defer f.RUnlock() - ids, ok := f.slotToProposerAndPayloadIDs[idKey(slot, r)] +// PayloadID returns the payload ID for the given slot and parent block root +func (p *PayloadIDCache) PayloadID(slot primitives.Slot, root [32]byte) (primitives.PayloadID, bool) { + p.Lock() + defer p.Unlock() + inner, ok := p.slotToPayloadID[slot] if !ok { - return 0, [pIdLength]byte{}, false + return primitives.PayloadID{}, false } - vId := ids[:vIdLength] - - b := ids[vIdLength:] - var pId [pIdLength]byte - copy(pId[:], b) - - return primitives.ValidatorIndex(bytesutil.BytesToUint64BigEndian(vId)), pId, true + pid, ok := inner[root] + if !ok { + return primitives.PayloadID{}, false + } + return pid, true } -// SetProposerAndPayloadIDs sets the proposer and payload IDs for the given slot and head root to build block. -func (f *ProposerPayloadIDsCache) SetProposerAndPayloadIDs( - slot primitives.Slot, - vId primitives.ValidatorIndex, - pId [pIdLength]byte, - r [fieldparams.RootLength]byte, -) { - f.Lock() - defer f.Unlock() - var vIdBytes [vIdLength]byte - copy(vIdBytes[:], bytesutil.Uint64ToBytesBigEndian(uint64(vId))) - - var bs [vpIdsLength]byte - copy(bs[:], append(vIdBytes[:], pId[:]...)) - - k := idKey(slot, r) - ids, ok := f.slotToProposerAndPayloadIDs[k] - // Ok to overwrite if the slot is already set but the cached payload ID is not set. - // This combats the re-org case where payload assignment could change at the start of the epoch. - var byte8 [vIdLength]byte - if !ok || (ok && bytes.Equal(ids[vIdLength:], byte8[:])) { - f.slotToProposerAndPayloadIDs[k] = bs +// SetPayloadID updates the payload ID for the given slot and head root +// it also prunes older entries in the cache +func (p *PayloadIDCache) Set(slot primitives.Slot, root [32]byte, pid primitives.PayloadID) { + p.Lock() + defer p.Unlock() + if slot > 1 { + p.prune(slot - 2) } + inner, ok := p.slotToPayloadID[slot] + if !ok { + inner = make(RootToPayloadIDMap) + p.slotToPayloadID[slot] = inner + } + inner[root] = pid } -// PrunePayloadIDs removes the payload ID entries older than input slot. -func (f *ProposerPayloadIDsCache) PrunePayloadIDs(slot primitives.Slot) { - f.Lock() - defer f.Unlock() - - for k := range f.slotToProposerAndPayloadIDs { - s := primitives.Slot(bytesutil.BytesToUint64BigEndian(k[:8])) - if slot > s { - delete(f.slotToProposerAndPayloadIDs, k) +// Prune prunes old payload IDs. Requires a Lock in the cache +func (p *PayloadIDCache) prune(slot primitives.Slot) { + for key := range p.slotToPayloadID { + if key < slot { + delete(p.slotToPayloadID, key) } } } - -func idKey(slot primitives.Slot, r [fieldparams.RootLength]byte) [keyLength]byte { - var k [keyLength]byte - copy(k[:], append(bytesutil.Uint64ToBytesBigEndian(uint64(slot)), r[:]...)) - return k -} diff --git a/beacon-chain/cache/payload_id_test.go b/beacon-chain/cache/payload_id_test.go index 89f17b15b3..f817c4da47 100644 --- a/beacon-chain/cache/payload_id_test.go +++ b/beacon-chain/cache/payload_id_test.go @@ -8,65 +8,54 @@ import ( ) func TestValidatorPayloadIDsCache_GetAndSaveValidatorPayloadIDs(t *testing.T) { - cache := NewProposerPayloadIDsCache() + cache := NewPayloadIDCache() var r [32]byte - i, p, ok := cache.GetProposerPayloadIDs(0, r) + p, ok := cache.PayloadID(0, r) require.Equal(t, false, ok) - require.Equal(t, primitives.ValidatorIndex(0), i) - require.Equal(t, [pIdLength]byte{}, p) + require.Equal(t, primitives.PayloadID{}, p) slot := primitives.Slot(1234) - vid := primitives.ValidatorIndex(34234324) - pid := [8]byte{1, 2, 3, 3, 7, 8, 7, 8} + pid := primitives.PayloadID{1, 2, 3, 3, 7, 8, 7, 8} r = [32]byte{1, 2, 3} - cache.SetProposerAndPayloadIDs(slot, vid, pid, r) - i, p, ok = cache.GetProposerPayloadIDs(slot, r) + cache.Set(slot, r, pid) + p, ok = cache.PayloadID(slot, r) require.Equal(t, true, ok) - require.Equal(t, vid, i) require.Equal(t, pid, p) slot = primitives.Slot(9456456) - vid = primitives.ValidatorIndex(6786745) r = [32]byte{4, 5, 6} - cache.SetProposerAndPayloadIDs(slot, vid, [pIdLength]byte{}, r) - i, p, ok = cache.GetProposerPayloadIDs(slot, r) + cache.Set(slot, r, primitives.PayloadID{}) + p, ok = cache.PayloadID(slot, r) require.Equal(t, true, ok) - require.Equal(t, vid, i) - require.Equal(t, [pIdLength]byte{}, p) + require.Equal(t, primitives.PayloadID{}, p) // reset cache without pid slot = primitives.Slot(9456456) - vid = primitives.ValidatorIndex(11111) r = [32]byte{7, 8, 9} pid = [8]byte{3, 2, 3, 33, 72, 8, 7, 8} - cache.SetProposerAndPayloadIDs(slot, vid, pid, r) - i, p, ok = cache.GetProposerPayloadIDs(slot, r) + cache.Set(slot, r, pid) + p, ok = cache.PayloadID(slot, r) require.Equal(t, true, ok) - require.Equal(t, vid, i) require.Equal(t, pid, p) // Forked chain r = [32]byte{1, 2, 3} - i, p, ok = cache.GetProposerPayloadIDs(slot, r) + p, ok = cache.PayloadID(slot, r) require.Equal(t, false, ok) - require.Equal(t, primitives.ValidatorIndex(0), i) - require.Equal(t, [pIdLength]byte{}, p) + require.Equal(t, primitives.PayloadID{}, p) - // existing pid - no change in cache + // existing pid - change the cache slot = primitives.Slot(9456456) - vid = primitives.ValidatorIndex(11111) r = [32]byte{7, 8, 9} - newPid := [8]byte{1, 2, 3, 33, 72, 8, 7, 1} - cache.SetProposerAndPayloadIDs(slot, vid, newPid, r) - i, p, ok = cache.GetProposerPayloadIDs(slot, r) + newPid := primitives.PayloadID{1, 2, 3, 33, 72, 8, 7, 1} + cache.Set(slot, r, newPid) + p, ok = cache.PayloadID(slot, r) require.Equal(t, true, ok) - require.Equal(t, vid, i) - require.Equal(t, pid, p) + require.Equal(t, newPid, p) // remove cache entry - cache.PrunePayloadIDs(slot + 1) - i, p, ok = cache.GetProposerPayloadIDs(slot, r) + cache.prune(slot + 1) + p, ok = cache.PayloadID(slot, r) require.Equal(t, false, ok) - require.Equal(t, primitives.ValidatorIndex(0), i) - require.Equal(t, [pIdLength]byte{}, p) + require.Equal(t, primitives.PayloadID{}, p) } diff --git a/beacon-chain/cache/proposer_indices_disabled.go b/beacon-chain/cache/proposer_indices_disabled.go index f20cf3f89e..69052f9124 100644 --- a/beacon-chain/cache/proposer_indices_disabled.go +++ b/beacon-chain/cache/proposer_indices_disabled.go @@ -4,11 +4,26 @@ package cache import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" ) +var ( + // ProposerIndicesCacheMiss tracks the number of proposerIndices requests that aren't present in the cache. + ProposerIndicesCacheMiss = promauto.NewCounter(prometheus.CounterOpts{ + Name: "proposer_indices_cache_miss", + Help: "The number of proposer indices requests that aren't present in the cache.", + }) + // ProposerIndicesCacheHit tracks the number of proposerIndices requests that are in the cache. + ProposerIndicesCacheHit = promauto.NewCounter(prometheus.CounterOpts{ + Name: "proposer_indices_cache_hit", + Help: "The number of proposer indices requests that are present in the cache.", + }) +) + // FakeProposerIndicesCache is a struct with 1 queue for looking up proposer indices by root. type FakeProposerIndicesCache struct { } diff --git a/beacon-chain/cache/tracked_validators.go b/beacon-chain/cache/tracked_validators.go new file mode 100644 index 0000000000..991bb9b66e --- /dev/null +++ b/beacon-chain/cache/tracked_validators.go @@ -0,0 +1,43 @@ +package cache + +import ( + "sync" + + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" +) + +type TrackedValidator struct { + Active bool + FeeRecipient primitives.ExecutionAddress + Index primitives.ValidatorIndex +} + +type TrackedValidatorsCache struct { + sync.Mutex + trackedValidators map[primitives.ValidatorIndex]TrackedValidator +} + +func NewTrackedValidatorsCache() *TrackedValidatorsCache { + return &TrackedValidatorsCache{ + trackedValidators: make(map[primitives.ValidatorIndex]TrackedValidator), + } +} + +func (t *TrackedValidatorsCache) Validator(index primitives.ValidatorIndex) (TrackedValidator, bool) { + t.Lock() + defer t.Unlock() + val, ok := t.trackedValidators[index] + return val, ok +} + +func (t *TrackedValidatorsCache) Set(val TrackedValidator) { + t.Lock() + defer t.Unlock() + t.trackedValidators[val.Index] = val +} + +func (t *TrackedValidatorsCache) Prune() { + t.Lock() + defer t.Unlock() + t.trackedValidators = make(map[primitives.ValidatorIndex]TrackedValidator) +} diff --git a/beacon-chain/core/helpers/beacon_committee.go b/beacon-chain/core/helpers/beacon_committee.go index 46f1ea701d..0ad139c2da 100644 --- a/beacon-chain/core/helpers/beacon_committee.go +++ b/beacon-chain/core/helpers/beacon_committee.go @@ -346,7 +346,7 @@ func UpdateProposerIndicesInCache(ctx context.Context, state state.ReadOnlyBeaco if err != nil { return err } - root, err := state.StateRootAtIndex(uint64(slot % params.BeaconConfig().SlotsPerHistoricalRoot)) + root, err := StateRootAtSlot(state, slot) if err != nil { return err } diff --git a/beacon-chain/core/helpers/validators.go b/beacon-chain/core/helpers/validators.go index acbb8163fd..c34d884a64 100644 --- a/beacon-chain/core/helpers/validators.go +++ b/beacon-chain/core/helpers/validators.go @@ -20,10 +20,14 @@ import ( "go.opencensus.io/trace" ) -var CommitteeCacheInProgressHit = promauto.NewCounter(prometheus.CounterOpts{ - Name: "committee_cache_in_progress_hit", - Help: "The number of committee requests that are present in the cache.", -}) +var ( + CommitteeCacheInProgressHit = promauto.NewCounter(prometheus.CounterOpts{ + Name: "committee_cache_in_progress_hit", + Help: "The number of committee requests that are present in the cache.", + }) + + errProposerIndexMiss = errors.New("propoposer index not found in cache") +) // IsActiveValidator returns the boolean value on whether the validator // is active or not. @@ -259,10 +263,32 @@ func ValidatorActivationChurnLimitDeneb(activeValidatorCount uint64) uint64 { // indices = get_active_validator_indices(state, epoch) // return compute_proposer_index(state, indices, seed) func BeaconProposerIndex(ctx context.Context, state state.ReadOnlyBeaconState) (primitives.ValidatorIndex, error) { - e := time.CurrentEpoch(state) + return BeaconProposerIndexAtSlot(ctx, state, state.Slot()) +} + +// cachedProposerIndexAtSlot returns the proposer index at the given slot from +// the cache at the given root key. +func cachedProposerIndexAtSlot(slot primitives.Slot, root [32]byte) (primitives.ValidatorIndex, error) { + proposerIndices, has := proposerIndicesCache.ProposerIndices(slots.ToEpoch(slot), root) + if !has { + cache.ProposerIndicesCacheMiss.Inc() + return 0, errProposerIndexMiss + } + if len(proposerIndices) != int(params.BeaconConfig().SlotsPerEpoch) { + cache.ProposerIndicesCacheMiss.Inc() + return 0, errProposerIndexMiss + } + return proposerIndices[slot%params.BeaconConfig().SlotsPerEpoch], nil +} + +// BeaconProposerIndexAtSlot returns proposer index at the given slot from the +// point of view of the given state as head state +func BeaconProposerIndexAtSlot(ctx context.Context, state state.ReadOnlyBeaconState, slot primitives.Slot) (primitives.ValidatorIndex, error) { + e := slots.ToEpoch(slot) + // The cache uses the state root of the previous epoch - minimum_seed_lookahead last slot as key. (e.g. Starting epoch 1, slot 32, the key would be block root at slot 31) + // For simplicity, the node will skip caching of genesis epoch. if e > params.BeaconConfig().GenesisEpoch+params.BeaconConfig().MinSeedLookahead { - wantedEpoch := time.PrevEpoch(state) - s, err := slots.EpochEnd(wantedEpoch) + s, err := slots.EpochEnd(e - 1) if err != nil { return 0, err } @@ -271,13 +297,17 @@ func BeaconProposerIndex(ctx context.Context, state state.ReadOnlyBeaconState) ( return 0, err } if r != nil && !bytes.Equal(r, params.BeaconConfig().ZeroHash[:]) { - proposerIndices, ok := proposerIndicesCache.ProposerIndices(wantedEpoch+1, bytesutil.ToBytes32(r)) - if ok { - return proposerIndices[state.Slot()%params.BeaconConfig().SlotsPerEpoch], nil + pid, err := cachedProposerIndexAtSlot(slot, [32]byte(r)) + if err == nil { + return pid, nil } - if err := UpdateProposerIndicesInCache(ctx, state, time.CurrentEpoch(state)); err != nil { + if err := UpdateProposerIndicesInCache(ctx, state, e); err != nil { return 0, errors.Wrap(err, "could not update committee cache") } + pid, err = cachedProposerIndexAtSlot(slot, [32]byte(r)) + if err == nil { + return pid, nil + } } } @@ -286,7 +316,7 @@ func BeaconProposerIndex(ctx context.Context, state state.ReadOnlyBeaconState) ( return 0, errors.Wrap(err, "could not generate seed") } - seedWithSlot := append(seed[:], bytesutil.Bytes8(uint64(state.Slot()))...) + seedWithSlot := append(seed[:], bytesutil.Bytes8(uint64(slot))...) seedWithSlotHash := hash.Hash(seedWithSlot) indices, err := ActiveValidatorIndices(ctx, state, e) diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 56753f849a..a1012e167e 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -98,7 +98,8 @@ type BeaconNode struct { syncCommitteePool synccommittee.Pool blsToExecPool blstoexec.PoolManager depositCache cache.DepositCache - proposerIdsCache *cache.ProposerPayloadIDsCache + trackedValidatorsCache *cache.TrackedValidatorsCache + payloadIDCache *cache.PayloadIDCache stateFeed *event.Feed blockFeed *event.Feed opFeed *event.Feed @@ -179,10 +180,11 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco slashingsPool: slashings.NewPool(), syncCommitteePool: synccommittee.NewPool(), blsToExecPool: blstoexec.NewPool(), + trackedValidatorsCache: cache.NewTrackedValidatorsCache(), + payloadIDCache: cache.NewPayloadIDCache(), slasherBlockHeadersFeed: new(event.Feed), slasherAttestationsFeed: new(event.Feed), serviceFlagOpts: &serviceFlagOpts{}, - proposerIdsCache: cache.NewProposerPayloadIDsCache(), } beacon.initialSyncComplete = make(chan struct{}) @@ -659,10 +661,11 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st blockchain.WithStateGen(b.stateGen), blockchain.WithSlasherAttestationsFeed(b.slasherAttestationsFeed), blockchain.WithFinalizedStateAtStartUp(b.finalizedStateAtStartUp), - blockchain.WithProposerIdsCache(b.proposerIdsCache), blockchain.WithClockSynchronizer(gs), blockchain.WithSyncComplete(syncComplete), blockchain.WithBlobStorage(b.BlobStorage), + blockchain.WithTrackedValidatorsCache(b.trackedValidatorsCache), + blockchain.WithPayloadIDCache(b.payloadIDCache), ) blockchainService, err := blockchain.NewService(b.ctx, opts...) @@ -893,11 +896,12 @@ func (b *BeaconNode) registerRPCService(router *mux.Router) error { StateGen: b.stateGen, EnableDebugRPCEndpoints: enableDebugRPCEndpoints, MaxMsgSize: maxMsgSize, - ProposerIdsCache: b.proposerIdsCache, BlockBuilder: b.fetchBuilderService(), Router: router, ClockWaiter: b.clockWaiter, BlobStorage: b.BlobStorage, + TrackedValidatorsCache: b.trackedValidatorsCache, + PayloadIDCache: b.payloadIDCache, }) return b.services.RegisterService(rpcService) diff --git a/beacon-chain/rpc/eth/validator/BUILD.bazel b/beacon-chain/rpc/eth/validator/BUILD.bazel index 7320ffc0b9..c5678bdedc 100644 --- a/beacon-chain/rpc/eth/validator/BUILD.bazel +++ b/beacon-chain/rpc/eth/validator/BUILD.bazel @@ -19,7 +19,6 @@ go_library( "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/transition:go_default_library", "//beacon-chain/db:go_default_library", - "//beacon-chain/db/kv:go_default_library", "//beacon-chain/operations/attestations:go_default_library", "//beacon-chain/operations/synccommittee:go_default_library", "//beacon-chain/p2p:go_default_library", @@ -41,7 +40,6 @@ go_library( "//proto/prysm/v1alpha1:go_default_library", "//runtime/version:go_default_library", "//time/slots:go_default_library", - "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", @@ -91,7 +89,6 @@ go_test( "//testing/require:go_default_library", "//testing/util:go_default_library", "//time/slots:go_default_library", - "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_golang_mock//gomock:go_default_library", "@com_github_gorilla_mux//:go_default_library", diff --git a/beacon-chain/rpc/eth/validator/handlers.go b/beacon-chain/rpc/eth/validator/handlers.go index e2f88a0e01..bdb06e70e6 100644 --- a/beacon-chain/rpc/eth/validator/handlers.go +++ b/beacon-chain/rpc/eth/validator/handlers.go @@ -11,14 +11,12 @@ import ( "strconv" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v4/beacon-chain/builder" "github.com/prysmaticlabs/prysm/v4/beacon-chain/cache" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" - "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/kv" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core" rpchelpers "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/helpers" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/shared" @@ -543,9 +541,6 @@ func (s *Server) RegisterValidator(w http.ResponseWriter, r *http.Request) { // PrepareBeaconProposer endpoint saves the fee recipient given a validator index, this is used when proposing a block. func (s *Server) PrepareBeaconProposer(w http.ResponseWriter, r *http.Request) { - ctx, span := trace.StartSpan(r.Context(), "validator.PrepareBeaconProposer") - defer span.End() - var jsonFeeRecipients []*shared.FeeRecipient err := json.NewDecoder(r.Body).Decode(&jsonFeeRecipients) switch { @@ -556,7 +551,6 @@ func (s *Server) PrepareBeaconProposer(w http.ResponseWriter, r *http.Request) { httputil.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest) return } - var feeRecipients []common.Address var validatorIndices []primitives.ValidatorIndex // filter for found fee recipients for _, r := range jsonFeeRecipients { @@ -568,28 +562,25 @@ func (s *Server) PrepareBeaconProposer(w http.ResponseWriter, r *http.Request) { if !valid { return } - f, err := s.BeaconDB.FeeRecipientByValidatorID(ctx, primitives.ValidatorIndex(validatorIndex)) - switch { - case errors.Is(err, kv.ErrNotFoundFeeRecipient): - feeRecipients = append(feeRecipients, common.BytesToAddress(bytesutil.SafeCopyBytes(feeRecipientBytes))) - validatorIndices = append(validatorIndices, primitives.ValidatorIndex(validatorIndex)) - case err != nil: - httputil.HandleError(w, fmt.Sprintf("Could not get fee recipient by validator index: %v", err), http.StatusInternalServerError) - return - default: - if common.BytesToAddress(feeRecipientBytes) != f { - feeRecipients = append(feeRecipients, common.BytesToAddress(bytesutil.SafeCopyBytes(feeRecipientBytes))) - validatorIndices = append(validatorIndices, primitives.ValidatorIndex(validatorIndex)) + // Use default address if the burn address is return + feeRecipient := primitives.ExecutionAddress(feeRecipientBytes) + if feeRecipient == primitives.ExecutionAddress([20]byte{}) { + feeRecipient = primitives.ExecutionAddress(params.BeaconConfig().DefaultFeeRecipient) + if feeRecipient == primitives.ExecutionAddress([20]byte{}) { + log.WithField("validatorIndex", validatorIndex).Warn("fee recipient is the burn address") } } + val := cache.TrackedValidator{ + Active: true, // TODO: either check or add the field in the request + Index: primitives.ValidatorIndex(validatorIndex), + FeeRecipient: feeRecipient, + } + s.TrackedValidatorsCache.Set(val) + validatorIndices = append(validatorIndices, primitives.ValidatorIndex(validatorIndex)) } if len(validatorIndices) == 0 { return } - if err := s.BeaconDB.SaveFeeRecipientsByValidatorIDs(ctx, validatorIndices, feeRecipients); err != nil { - httputil.HandleError(w, fmt.Sprintf("Could not save fee recipients: %v", err), http.StatusInternalServerError) - return - } log.WithFields(log.Fields{ "validatorIndices": validatorIndices, }).Info("Updated fee recipient addresses") @@ -812,7 +803,6 @@ func (s *Server) GetProposerDuties(w http.ResponseWriter, r *http.Request) { pubkey48 := val.PublicKey() pubkey := pubkey48[:] for _, slot := range proposalSlots { - s.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, index, [8]byte{} /* payloadID */, [32]byte{} /* head root */) duties = append(duties, &ProposerDuty{ Pubkey: hexutil.Encode(pubkey), ValidatorIndex: strconv.FormatUint(uint64(index), 10), @@ -821,8 +811,6 @@ func (s *Server) GetProposerDuties(w http.ResponseWriter, r *http.Request) { } } - s.ProposerSlotIndexCache.PrunePayloadIDs(epochStartSlot) - dependentRoot, err := proposalDependentRoot(st, requestedEpoch) if err != nil { httputil.HandleError(w, "Could not get dependent root: "+err.Error(), http.StatusInternalServerError) diff --git a/beacon-chain/rpc/eth/validator/handlers_test.go b/beacon-chain/rpc/eth/validator/handlers_test.go index 480869db1f..9178c520b9 100644 --- a/beacon-chain/rpc/eth/validator/handlers_test.go +++ b/beacon-chain/rpc/eth/validator/handlers_test.go @@ -12,7 +12,6 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/gorilla/mux" "github.com/pkg/errors" @@ -1660,7 +1659,8 @@ func TestGetProposerDuties(t *testing.T) { TimeFetcher: chain, OptimisticModeFetcher: chain, SyncChecker: &mockSync.Sync{IsSyncing: false}, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + PayloadIDCache: cache.NewPayloadIDCache(), + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), } request := httptest.NewRequest(http.MethodGet, "http://www.example.com/eth/v1/validator/duties/proposer/{epoch}", nil) @@ -1681,9 +1681,6 @@ func TestGetProposerDuties(t *testing.T) { expectedDuty = duty } } - vid, _, has := s.ProposerSlotIndexCache.GetProposerPayloadIDs(11, [32]byte{}) - require.Equal(t, true, has) - require.Equal(t, primitives.ValidatorIndex(12289), vid) require.NotNil(t, expectedDuty, "Expected duty for slot 11 not found") assert.Equal(t, "12289", expectedDuty.ValidatorIndex) assert.Equal(t, hexutil.Encode(pubKeys[12289]), expectedDuty.Pubkey) @@ -1702,7 +1699,8 @@ func TestGetProposerDuties(t *testing.T) { TimeFetcher: chain, OptimisticModeFetcher: chain, SyncChecker: &mockSync.Sync{IsSyncing: false}, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + PayloadIDCache: cache.NewPayloadIDCache(), + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), } request := httptest.NewRequest(http.MethodGet, "http://www.example.com/eth/v1/validator/duties/proposer/{epoch}", nil) @@ -1723,52 +1721,10 @@ func TestGetProposerDuties(t *testing.T) { expectedDuty = duty } } - vid, _, has := s.ProposerSlotIndexCache.GetProposerPayloadIDs(43, [32]byte{}) - require.Equal(t, true, has) - require.Equal(t, primitives.ValidatorIndex(1360), vid) require.NotNil(t, expectedDuty, "Expected duty for slot 43 not found") assert.Equal(t, "1360", expectedDuty.ValidatorIndex) assert.Equal(t, hexutil.Encode(pubKeys[1360]), expectedDuty.Pubkey) }) - t.Run("prune payload ID cache", func(t *testing.T) { - bs, err := transition.GenesisBeaconState(context.Background(), deposits, 0, eth1Data) - require.NoError(t, err, "Could not set up genesis state") - require.NoError(t, bs.SetSlot(params.BeaconConfig().SlotsPerEpoch)) - require.NoError(t, bs.SetBlockRoots(roots)) - chainSlot := params.BeaconConfig().SlotsPerEpoch - chain := &mockChain.ChainService{ - State: bs, Root: genesisRoot[:], Slot: &chainSlot, - } - s := &Server{ - Stater: &testutil.MockStater{StatesBySlot: map[primitives.Slot]state.BeaconState{params.BeaconConfig().SlotsPerEpoch: bs}}, - HeadFetcher: chain, - TimeFetcher: chain, - OptimisticModeFetcher: chain, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), - } - - s.ProposerSlotIndexCache.SetProposerAndPayloadIDs(1, 1, [8]byte{1}, [32]byte{2}) - s.ProposerSlotIndexCache.SetProposerAndPayloadIDs(31, 2, [8]byte{2}, [32]byte{3}) - s.ProposerSlotIndexCache.SetProposerAndPayloadIDs(32, 4309, [8]byte{3}, [32]byte{4}) - - request := httptest.NewRequest(http.MethodGet, "http://www.example.com/eth/v1/validator/duties/proposer/{epoch}", nil) - request = mux.SetURLVars(request, map[string]string{"epoch": "1"}) - writer := httptest.NewRecorder() - writer.Body = &bytes.Buffer{} - - s.GetProposerDuties(writer, request) - assert.Equal(t, http.StatusOK, writer.Code) - vid, _, has := s.ProposerSlotIndexCache.GetProposerPayloadIDs(1, [32]byte{}) - require.Equal(t, false, has) - require.Equal(t, primitives.ValidatorIndex(0), vid) - vid, _, has = s.ProposerSlotIndexCache.GetProposerPayloadIDs(2, [32]byte{}) - require.Equal(t, false, has) - require.Equal(t, primitives.ValidatorIndex(0), vid) - vid, _, has = s.ProposerSlotIndexCache.GetProposerPayloadIDs(32, [32]byte{}) - require.Equal(t, true, has) - require.Equal(t, primitives.ValidatorIndex(10565), vid) - }) t.Run("epoch out of bounds", func(t *testing.T) { bs, err := transition.GenesisBeaconState(context.Background(), deposits, 0, eth1Data) require.NoError(t, err, "Could not set up genesis state") @@ -1785,7 +1741,8 @@ func TestGetProposerDuties(t *testing.T) { TimeFetcher: chain, OptimisticModeFetcher: chain, SyncChecker: &mockSync.Sync{IsSyncing: false}, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + PayloadIDCache: cache.NewPayloadIDCache(), + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), } currentEpoch := slots.ToEpoch(bs.Slot()) @@ -1828,7 +1785,8 @@ func TestGetProposerDuties(t *testing.T) { TimeFetcher: chain, OptimisticModeFetcher: chain, SyncChecker: &mockSync.Sync{IsSyncing: false}, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + PayloadIDCache: cache.NewPayloadIDCache(), + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), } request := httptest.NewRequest(http.MethodGet, "http://www.example.com/eth/v1/validator/duties/proposer/{epoch}", nil) @@ -2267,21 +2225,24 @@ func TestPrepareBeaconProposer(t *testing.T) { request := httptest.NewRequest(http.MethodPost, url, &body) writer := httptest.NewRecorder() db := dbutil.SetupDB(t) - ctx := context.Background() server := &Server{ - BeaconDB: db, + BeaconDB: db, + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), + PayloadIDCache: cache.NewPayloadIDCache(), } server.PrepareBeaconProposer(writer, request) require.Equal(t, tt.code, writer.Code) if tt.wantErr != "" { require.Equal(t, strings.Contains(writer.Body.String(), tt.wantErr), true) } else { - require.NoError(t, err) - address, err := server.BeaconDB.FeeRecipientByValidatorID(ctx, 1) require.NoError(t, err) feebytes, err := hexutil.Decode(tt.request[0].FeeRecipient) require.NoError(t, err) - require.Equal(t, common.BytesToAddress(feebytes), address) + index, err := strconv.ParseUint(tt.request[0].ValidatorIndex, 10, 64) + require.NoError(t, err) + val, tracked := server.TrackedValidatorsCache.Validator(primitives.ValidatorIndex(index)) + require.Equal(t, true, tracked) + require.Equal(t, primitives.ExecutionAddress(feebytes), val.FeeRecipient) } }) } @@ -2292,7 +2253,11 @@ func TestProposer_PrepareBeaconProposerOverlapping(t *testing.T) { db := dbutil.SetupDB(t) // New validator - proposerServer := &Server{BeaconDB: db} + proposerServer := &Server{ + BeaconDB: db, + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), + PayloadIDCache: cache.NewPayloadIDCache(), + } req := []*shared.FeeRecipient{{ FeeRecipient: hexutil.Encode(bytesutil.PadTo([]byte{0xFF, 0x01, 0xFF, 0x01, 0xFF, 0x01, 0xFF, 0x01, 0xFF, 0xFF, 0x01, 0xFF, 0x01, 0xFF, 0x01, 0xFF, 0x01, 0xFF}, fieldparams.FeeRecipientLength)), ValidatorIndex: "1", @@ -2318,7 +2283,7 @@ func TestProposer_PrepareBeaconProposerOverlapping(t *testing.T) { writer = httptest.NewRecorder() proposerServer.PrepareBeaconProposer(writer, request) require.Equal(t, http.StatusOK, writer.Code) - require.LogsDoNotContain(t, hook, "Updated fee recipient addresses") + require.LogsContain(t, hook, "Updated fee recipient addresses") // Same validator with different fee recipient hook.Reset() @@ -2368,13 +2333,16 @@ func TestProposer_PrepareBeaconProposerOverlapping(t *testing.T) { writer = httptest.NewRecorder() proposerServer.PrepareBeaconProposer(writer, request) require.Equal(t, http.StatusOK, writer.Code) - require.LogsDoNotContain(t, hook, "Updated fee recipient addresses") + require.LogsContain(t, hook, "Updated fee recipient addresses") } func BenchmarkServer_PrepareBeaconProposer(b *testing.B) { db := dbutil.SetupDB(b) - proposerServer := &Server{BeaconDB: db} - + proposerServer := &Server{ + BeaconDB: db, + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), + PayloadIDCache: cache.NewPayloadIDCache(), + } f := bytesutil.PadTo([]byte{0xFF, 0x01, 0xFF, 0x01, 0xFF, 0x01, 0xFF, 0x01, 0xFF, 0xFF, 0x01, 0xFF, 0x01, 0xFF, 0x01, 0xFF, 0x01, 0xFF}, fieldparams.FeeRecipientLength) recipients := make([]*shared.FeeRecipient, 0) for i := 0; i < 10000; i++ { diff --git a/beacon-chain/rpc/eth/validator/server.go b/beacon-chain/rpc/eth/validator/server.go index 13afa5dadd..d3e1bd448d 100644 --- a/beacon-chain/rpc/eth/validator/server.go +++ b/beacon-chain/rpc/eth/validator/server.go @@ -29,11 +29,12 @@ type Server struct { OptimisticModeFetcher blockchain.OptimisticModeFetcher SyncCommitteePool synccommittee.Pool V1Alpha1Server eth.BeaconNodeValidatorServer - ProposerSlotIndexCache *cache.ProposerPayloadIDsCache ChainInfoFetcher blockchain.ChainInfoFetcher BeaconDB db.HeadAccessDatabase BlockBuilder builder.BlockBuilder OperationNotifier operation.Notifier CoreService *core.Service BlockRewardFetcher rewards.BlockRewardsFetcher + TrackedValidatorsCache *cache.TrackedValidatorsCache + PayloadIDCache *cache.PayloadIDCache } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/assignments.go b/beacon-chain/rpc/prysm/v1alpha1/validator/assignments.go index 2299f7811b..4c7d90c9b0 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/assignments.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/assignments.go @@ -135,7 +135,7 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb. return nil, status.Errorf(codes.Internal, "Could not compute committee assignments: %v", err) } // Query the next epoch assignments for committee subnet subscriptions. - nextCommitteeAssignments, nextProposerIndexToSlots, err := helpers.CommitteeAssignments(ctx, s, req.Epoch+1) + nextCommitteeAssignments, _, err := helpers.CommitteeAssignments(ctx, s, req.Epoch+1) if err != nil { return nil, status.Errorf(codes.Internal, "Could not compute next committee assignments: %v", err) } @@ -178,15 +178,6 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb. nextAssignment.AttesterSlot = ca.AttesterSlot nextAssignment.CommitteeIndex = ca.CommitteeIndex } - // Cache proposer assignment for the current epoch. - for _, slot := range proposerIndexToSlots[idx] { - // Head root is empty because it can't be known until slot - 1. Same with payload id. - vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, idx, [8]byte{} /* payloadID */, [32]byte{} /* head root */) - } - // Cache proposer assignment for the next epoch. - for _, slot := range nextProposerIndexToSlots[idx] { - vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, idx, [8]byte{} /* payloadID */, [32]byte{} /* head root */) - } } else { // If the validator isn't in the beacon state, try finding their deposit to determine their status. // We don't need the lastActiveValidatorFn because we don't use the response in this. @@ -228,9 +219,6 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb. validatorAssignments = append(validatorAssignments, assignment) nextValidatorAssignments = append(nextValidatorAssignments, nextAssignment) } - // Prune payload ID cache for any slots before request slot. - vs.ProposerSlotIndexCache.PrunePayloadIDs(epochStartSlot) - return ðpb.DutiesResponse{ Duties: validatorAssignments, CurrentEpochDuties: validatorAssignments, diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/assignments_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/assignments_test.go index d8ccd801c3..245b4e77c7 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/assignments_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/assignments_test.go @@ -60,10 +60,10 @@ func TestGetDuties_OK(t *testing.T) { State: bs, Root: genesisRoot[:], Genesis: time.Now(), } vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + HeadFetcher: chain, + TimeFetcher: chain, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + PayloadIDCache: cache.NewPayloadIDCache(), } // Test the first validator in registry. @@ -145,11 +145,11 @@ func TestGetAltairDuties_SyncCommitteeOK(t *testing.T) { State: bs, Root: genesisRoot[:], Genesis: time.Now().Add(time.Duration(-1*int64(slot-1)) * time.Second), } vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - Eth1InfoFetcher: &mockExecution.Chain{}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + HeadFetcher: chain, + TimeFetcher: chain, + Eth1InfoFetcher: &mockExecution.Chain{}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + PayloadIDCache: cache.NewPayloadIDCache(), } // Test the first validator in registry. @@ -251,11 +251,11 @@ func TestGetBellatrixDuties_SyncCommitteeOK(t *testing.T) { State: bs, Root: genesisRoot[:], Genesis: time.Now().Add(time.Duration(-1*int64(slot-1)) * time.Second), } vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - Eth1InfoFetcher: &mockExecution.Chain{}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + HeadFetcher: chain, + TimeFetcher: chain, + Eth1InfoFetcher: &mockExecution.Chain{}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + PayloadIDCache: cache.NewPayloadIDCache(), } // Test the first validator in registry. @@ -343,12 +343,12 @@ func TestGetAltairDuties_UnknownPubkey(t *testing.T) { require.NoError(t, err) vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - Eth1InfoFetcher: &mockExecution.Chain{}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - DepositFetcher: depositCache, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + HeadFetcher: chain, + TimeFetcher: chain, + Eth1InfoFetcher: &mockExecution.Chain{}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + DepositFetcher: depositCache, + PayloadIDCache: cache.NewPayloadIDCache(), } unknownPubkey := bytesutil.PadTo([]byte{'u'}, 48) @@ -401,10 +401,10 @@ func TestGetDuties_CurrentEpoch_ShouldNotFail(t *testing.T) { State: bState, Root: genesisRoot[:], Genesis: time.Now(), } vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + HeadFetcher: chain, + TimeFetcher: chain, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + PayloadIDCache: cache.NewPayloadIDCache(), } // Test the first validator in registry. @@ -440,10 +440,10 @@ func TestGetDuties_MultipleKeys_OK(t *testing.T) { State: bs, Root: genesisRoot[:], Genesis: time.Now(), } vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + HeadFetcher: chain, + TimeFetcher: chain, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + PayloadIDCache: cache.NewPayloadIDCache(), } pubkey0 := deposits[0].Data.PublicKey @@ -507,12 +507,12 @@ func TestStreamDuties_OK(t *testing.T) { Genesis: time.Now(), } vs := &Server{ - Ctx: ctx, - HeadFetcher: &mockChain.ChainService{State: bs, Root: genesisRoot[:]}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - TimeFetcher: c, - StateNotifier: &mockChain.MockStateNotifier{}, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + Ctx: ctx, + HeadFetcher: &mockChain.ChainService{State: bs, Root: genesisRoot[:]}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + TimeFetcher: c, + StateNotifier: &mockChain.MockStateNotifier{}, + PayloadIDCache: cache.NewPayloadIDCache(), } // Test the first validator in registry. @@ -565,12 +565,12 @@ func TestStreamDuties_OK_ChainReorg(t *testing.T) { Genesis: time.Now(), } vs := &Server{ - Ctx: ctx, - HeadFetcher: &mockChain.ChainService{State: bs, Root: genesisRoot[:]}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - TimeFetcher: c, - StateNotifier: &mockChain.MockStateNotifier{}, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + Ctx: ctx, + HeadFetcher: &mockChain.ChainService{State: bs, Root: genesisRoot[:]}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + TimeFetcher: c, + StateNotifier: &mockChain.MockStateNotifier{}, + PayloadIDCache: cache.NewPayloadIDCache(), } // Test the first validator in registry. @@ -625,10 +625,9 @@ func BenchmarkCommitteeAssignment(b *testing.B) { chain := &mockChain.ChainService{State: bs, Root: genesisRoot[:]} vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + HeadFetcher: chain, + TimeFetcher: chain, + SyncChecker: &mockSync.Sync{IsSyncing: false}, } // Create request for all validators in the system. diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go index a0a25db5b8..10f1cbaeea 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go @@ -14,6 +14,7 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/v4/beacon-chain/builder" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/cache" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed" blockfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/block" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" @@ -69,6 +70,11 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) ( parentRoot := vs.ForkchoiceFetcher.GetProposerHead() if parentRoot != headRoot { blockchain.LateBlockAttemptedReorgCount.Inc() + log.WithFields(logrus.Fields{ + "slot": req.Slot, + "parentRoot": fmt.Sprintf("%#x", parentRoot), + "headRoot": fmt.Sprintf("%#x", headRoot), + }).Warn("late block attempted reorg failed") } // An optimistic validator MUST NOT produce a block (i.e., sign across the DOMAIN_BEACON_PROPOSER domain). @@ -275,45 +281,36 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign // PrepareBeaconProposer caches and updates the fee recipient for the given proposer. func (vs *Server) PrepareBeaconProposer( - ctx context.Context, request *ethpb.PrepareBeaconProposerRequest, + _ context.Context, request *ethpb.PrepareBeaconProposerRequest, ) (*emptypb.Empty, error) { - ctx, span := trace.StartSpan(ctx, "validator.PrepareBeaconProposer") - defer span.End() - var feeRecipients []common.Address var validatorIndices []primitives.ValidatorIndex - newRecipients := make([]*ethpb.PrepareBeaconProposerRequest_FeeRecipientContainer, 0, len(request.Recipients)) for _, r := range request.Recipients { - f, err := vs.BeaconDB.FeeRecipientByValidatorID(ctx, r.ValidatorIndex) - switch { - case errors.Is(err, kv.ErrNotFoundFeeRecipient): - newRecipients = append(newRecipients, r) - case err != nil: - return nil, status.Errorf(codes.Internal, "Could not get fee recipient by validator index: %v", err) - default: - if common.BytesToAddress(r.FeeRecipient) != f { - newRecipients = append(newRecipients, r) - } - } - } - if len(newRecipients) == 0 { - return &emptypb.Empty{}, nil - } - - for _, recipientContainer := range newRecipients { - recipient := hexutil.Encode(recipientContainer.FeeRecipient) + recipient := hexutil.Encode(r.FeeRecipient) if !common.IsHexAddress(recipient) { return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("Invalid fee recipient address: %v", recipient)) } - feeRecipients = append(feeRecipients, common.BytesToAddress(recipientContainer.FeeRecipient)) - validatorIndices = append(validatorIndices, recipientContainer.ValidatorIndex) + // Use default address if the burn address is return + feeRecipient := primitives.ExecutionAddress(r.FeeRecipient) + if feeRecipient == primitives.ExecutionAddress([20]byte{}) { + feeRecipient = primitives.ExecutionAddress(params.BeaconConfig().DefaultFeeRecipient) + if feeRecipient == primitives.ExecutionAddress([20]byte{}) { + log.WithField("validatorIndex", r.ValidatorIndex).Warn("fee recipient is the burn address") + } + } + val := cache.TrackedValidator{ + Active: true, // TODO: either check or add the field in the request + Index: r.ValidatorIndex, + FeeRecipient: feeRecipient, + } + vs.TrackedValidatorsCache.Set(val) + validatorIndices = append(validatorIndices, r.ValidatorIndex) } - if err := vs.BeaconDB.SaveFeeRecipientsByValidatorIDs(ctx, validatorIndices, feeRecipients); err != nil { - return nil, status.Errorf(codes.Internal, "Could not save fee recipients: %v", err) + if len(validatorIndices) != 0 { + log.WithFields(logrus.Fields{ + "validatorIndices": validatorIndices, + }).Info("Updated fee recipient addresses for validator indices") } - log.WithFields(logrus.Fields{ - "validatorIndices": validatorIndices, - }).Info("Updated fee recipient addresses for validator indices") return &emptypb.Empty{}, nil } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_bellatrix_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_bellatrix_test.go index 197ff3c55d..6910095090 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_bellatrix_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_bellatrix_test.go @@ -81,9 +81,10 @@ func TestServer_setExecutionData(t *testing.T) { HeadFetcher: &blockchainTest.ChainService{State: capellaTransitionState}, FinalizationFetcher: &blockchainTest.ChainService{}, BeaconDB: beaconDB, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + PayloadIDCache: cache.NewPayloadIDCache(), BlockBuilder: &builderTest.MockBuilderService{HasConfigured: true, Cfg: &builderTest.Config{BeaconDB: beaconDB}}, ForkchoiceFetcher: &blockchainTest.ChainService{}, + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), } t.Run("No builder configured. Use local block", func(t *testing.T) { diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go index e262d97123..09cb57a62a 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go @@ -5,14 +5,12 @@ import ( "context" "fmt" - "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/blocks" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time" - "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/kv" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/config/params" @@ -53,44 +51,36 @@ func (vs *Server) getLocalPayload(ctx context.Context, blk interfaces.ReadOnlyBe slot := blk.Slot() vIdx := blk.ProposerIndex() headRoot := blk.ParentRoot() - proposerID, payloadId, ok := vs.ProposerSlotIndexCache.GetProposerPayloadIDs(slot, headRoot) - feeRecipient := params.BeaconConfig().DefaultFeeRecipient - recipient, err := vs.BeaconDB.FeeRecipientByValidatorID(ctx, vIdx) - switch err == nil { - case true: - feeRecipient = recipient - case errors.As(err, kv.ErrNotFoundFeeRecipient): - // If fee recipient is not found in DB and not set from beacon node CLI, - // use the burn address. - if feeRecipient.String() == params.BeaconConfig().EthBurnAddressHex { - logrus.WithFields(logrus.Fields{ - "validatorIndex": vIdx, - "burnAddress": params.BeaconConfig().EthBurnAddressHex, - }).Warn("Fee recipient is currently using the burn address, " + - "you will not be rewarded transaction fees on this setting. " + - "Please set a different eth address as the fee recipient. " + - "Please refer to our documentation for instructions") - } - default: - return nil, false, errors.Wrap(err, "could not get fee recipient in db") + logFields := logrus.Fields{ + "validatorIndex": vIdx, + "slot": slot, + "headRoot": fmt.Sprintf("%#x", headRoot), + } + payloadId, ok := vs.PayloadIDCache.PayloadID(slot, headRoot) + + val, tracked := vs.TrackedValidatorsCache.Validator(vIdx) + if !tracked { + logrus.WithFields(logFields).Warn("could not find tracked proposer index") } - if ok && proposerID == vIdx && payloadId != [8]byte{} { // Payload ID is cache hit. Return the cached payload ID. - var pid [8]byte + var err error + if ok && payloadId != [8]byte{} { + // Payload ID is cache hit. Return the cached payload ID. + var pid primitives.PayloadID copy(pid[:], payloadId[:]) payloadIDCacheHit.Inc() payload, bundle, overrideBuilder, err := vs.ExecutionEngineCaller.GetPayload(ctx, pid, slot) switch { case err == nil: bundleCache.add(slot, bundle) - warnIfFeeRecipientDiffers(payload, feeRecipient) + warnIfFeeRecipientDiffers(payload, val.FeeRecipient) return payload, overrideBuilder, nil case errors.Is(err, context.DeadlineExceeded): default: return nil, false, errors.Wrap(err, "could not get cached payload from execution client") } } - + log.WithFields(logFields).Debug("payload ID cache miss") parentHash, err := vs.getParentBlockHash(ctx, st, slot) switch { case errors.Is(err, errActivationNotReached) || errors.Is(err, errNoTerminalBlockHash): @@ -112,7 +102,7 @@ func (vs *Server) getLocalPayload(ctx context.Context, blk interfaces.ReadOnlyBe finalizedBlockHash := [32]byte{} justifiedBlockHash := [32]byte{} // Blocks before Bellatrix don't have execution payloads. Use zeros as the hash. - if st.Version() >= version.Altair { + if st.Version() >= version.Bellatrix { finalizedBlockHash = vs.FinalizationFetcher.FinalizedBlockHash() justifiedBlockHash = vs.FinalizationFetcher.UnrealizedJustifiedPayloadBlockHash() } @@ -137,7 +127,7 @@ func (vs *Server) getLocalPayload(ctx context.Context, blk interfaces.ReadOnlyBe attr, err = payloadattribute.New(&enginev1.PayloadAttributesV3{ Timestamp: uint64(t.Unix()), PrevRandao: random, - SuggestedFeeRecipient: feeRecipient.Bytes(), + SuggestedFeeRecipient: val.FeeRecipient[:], Withdrawals: withdrawals, ParentBeaconBlockRoot: headRoot[:], }) @@ -152,7 +142,7 @@ func (vs *Server) getLocalPayload(ctx context.Context, blk interfaces.ReadOnlyBe attr, err = payloadattribute.New(&enginev1.PayloadAttributesV2{ Timestamp: uint64(t.Unix()), PrevRandao: random, - SuggestedFeeRecipient: feeRecipient.Bytes(), + SuggestedFeeRecipient: val.FeeRecipient[:], Withdrawals: withdrawals, }) if err != nil { @@ -162,7 +152,7 @@ func (vs *Server) getLocalPayload(ctx context.Context, blk interfaces.ReadOnlyBe attr, err = payloadattribute.New(&enginev1.PayloadAttributes{ Timestamp: uint64(t.Unix()), PrevRandao: random, - SuggestedFeeRecipient: feeRecipient.Bytes(), + SuggestedFeeRecipient: val.FeeRecipient[:], }) if err != nil { return nil, false, err @@ -182,13 +172,13 @@ func (vs *Server) getLocalPayload(ctx context.Context, blk interfaces.ReadOnlyBe return nil, false, err } bundleCache.add(slot, bundle) - warnIfFeeRecipientDiffers(payload, feeRecipient) + warnIfFeeRecipientDiffers(payload, val.FeeRecipient) return payload, overrideBuilder, nil } // warnIfFeeRecipientDiffers logs a warning if the fee recipient in the included payload does not // match the requested one. -func warnIfFeeRecipientDiffers(payload interfaces.ExecutionData, feeRecipient common.Address) { +func warnIfFeeRecipientDiffers(payload interfaces.ExecutionData, feeRecipient primitives.ExecutionAddress) { // Warn if the fee recipient is not the value we expect. if payload != nil && !bytes.Equal(payload.FeeRecipient(), feeRecipient[:]) { logrus.WithFields(logrus.Fields{ @@ -299,7 +289,7 @@ func getParentBlockHashPostMerge(st state.BeaconState) ([]byte, error) { if err != nil { return nil, errors.Wrap(err, "could not get post merge payload header") } - return header.ParentHash(), nil + return header.BlockHash(), nil } // getParentBlockHashPreMerge retrieves the parent block hash before the merge has completed. diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload_test.go index 58f4ebbe8a..7b157d9610 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload_test.go @@ -71,8 +71,6 @@ func TestServer_getExecutionPayload(t *testing.T) { Root: b2rCapella[:], })) - require.NoError(t, beaconDB.SaveFeeRecipientsByValidatorIDs(context.Background(), []primitives.ValidatorIndex{0}, []common.Address{{}})) - tests := []struct { name string st state.BeaconState @@ -86,9 +84,10 @@ func TestServer_getExecutionPayload(t *testing.T) { wantedOverride bool }{ { - name: "transition completed, nil payload id", - st: transitionSt, - errString: "nil payload with block hash", + name: "transition completed, nil payload id", + st: transitionSt, + validatorIndx: 2, + errString: "nil payload with block hash", }, { name: "transition completed, happy case (has fee recipient in Db)", @@ -110,6 +109,7 @@ func TestServer_getExecutionPayload(t *testing.T) { { name: "transition completed, happy case, (payload ID cached)", st: transitionSt, + payloadID: &pb.PayloadIDBytes{0x1}, validatorIndx: 100, }, { @@ -134,6 +134,7 @@ func TestServer_getExecutionPayload(t *testing.T) { st: transitionSt, validatorIndx: 100, override: true, + payloadID: &pb.PayloadIDBytes{0x1}, wantedOverride: true, }, } @@ -149,9 +150,13 @@ func TestServer_getExecutionPayload(t *testing.T) { HeadFetcher: &chainMock.ChainService{State: tt.st}, FinalizationFetcher: &chainMock.ChainService{}, BeaconDB: beaconDB, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + PayloadIDCache: cache.NewPayloadIDCache(), + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), + } + vs.TrackedValidatorsCache.Set(cache.TrackedValidator{Active: true, Index: tt.validatorIndx}) + if tt.payloadID != nil { + vs.PayloadIDCache.Set(tt.st.Slot(), [32]byte{'a'}, [8]byte(*tt.payloadID)) } - vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(tt.st.Slot(), 100, [8]byte{100}, [32]byte{'a'}) blk := util.NewBeaconBlockBellatrix() blk.Block.Slot = tt.st.Slot() blk.Block.ProposerIndex = tt.validatorIndx @@ -192,9 +197,10 @@ func TestServer_getExecutionPayloadContextTimeout(t *testing.T) { ExecutionEngineCaller: &powtesting.EngineClient{PayloadIDBytes: &pb.PayloadIDBytes{}, ErrGetPayload: context.DeadlineExceeded, ExecutionPayload: &pb.ExecutionPayload{}}, HeadFetcher: &chainMock.ChainService{State: nonTransitionSt}, BeaconDB: beaconDB, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + PayloadIDCache: cache.NewPayloadIDCache(), + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), } - vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(nonTransitionSt.Slot(), 100, [8]byte{100}, [32]byte{'a'}) + vs.PayloadIDCache.Set(nonTransitionSt.Slot(), [32]byte{'a'}, [8]byte{100}) blk := util.NewBeaconBlockBellatrix() blk.Block.Slot = nonTransitionSt.Slot() @@ -231,10 +237,6 @@ func TestServer_getExecutionPayload_UnexpectedFeeRecipient(t *testing.T) { })) feeRecipient := common.BytesToAddress([]byte("a")) - require.NoError(t, beaconDB.SaveFeeRecipientsByValidatorIDs(context.Background(), []primitives.ValidatorIndex{0}, []common.Address{ - feeRecipient, - })) - payloadID := &pb.PayloadIDBytes{0x1} payload := emptyPayload() payload.FeeRecipient = feeRecipient[:] @@ -246,8 +248,15 @@ func TestServer_getExecutionPayload_UnexpectedFeeRecipient(t *testing.T) { HeadFetcher: &chainMock.ChainService{State: transitionSt}, FinalizationFetcher: &chainMock.ChainService{}, BeaconDB: beaconDB, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + PayloadIDCache: cache.NewPayloadIDCache(), + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), } + val := cache.TrackedValidator{ + Active: true, + FeeRecipient: primitives.ExecutionAddress(feeRecipient), + Index: 0, + } + vs.TrackedValidatorsCache.Set(val) blk := util.NewBeaconBlockBellatrix() blk.Block.Slot = transitionSt.Slot() @@ -257,6 +266,7 @@ func TestServer_getExecutionPayload_UnexpectedFeeRecipient(t *testing.T) { gotPayload, _, err := vs.getLocalPayload(context.Background(), b.Block(), transitionSt) require.NoError(t, err) require.NotNil(t, gotPayload) + require.Equal(t, common.Address(gotPayload.FeeRecipient()), feeRecipient) // We should NOT be getting the warning. require.LogsDoNotContain(t, hook, "Fee recipient address from execution client is not what was expected") @@ -264,7 +274,7 @@ func TestServer_getExecutionPayload_UnexpectedFeeRecipient(t *testing.T) { evilRecipientAddress := common.BytesToAddress([]byte("evil")) payload.FeeRecipient = evilRecipientAddress[:] - vs.ProposerSlotIndexCache = cache.NewProposerPayloadIDsCache() + vs.PayloadIDCache = cache.NewPayloadIDCache() gotPayload, _, err = vs.getLocalPayload(context.Background(), b.Block(), transitionSt) require.NoError(t, err) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go index 9f740b9fc5..d34acffa5c 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go @@ -195,7 +195,6 @@ func TestServer_GetBeaconBlock_Altair(t *testing.T) { func TestServer_GetBeaconBlock_Bellatrix(t *testing.T) { db := dbutil.SetupDB(t) ctx := context.Background() - hook := logTest.NewGlobal() terminalBlockHash := bytesutil.PadTo([]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, 32) @@ -306,7 +305,6 @@ func TestServer_GetBeaconBlock_Bellatrix(t *testing.T) { assert.DeepEqual(t, randaoReveal, bellatrixBlk.Bellatrix.Body.RandaoReveal, "Expected block to have correct randao reveal") assert.DeepEqual(t, req.Graffiti, bellatrixBlk.Bellatrix.Body.Graffiti, "Expected block to have correct Graffiti") - require.LogsContain(t, hook, "Fee recipient is currently using the burn address") require.DeepEqual(t, payload, bellatrixBlk.Bellatrix.Body.ExecutionPayload) // Payload should equal. // Operator sets default fee recipient to not be burned through beacon node cli. @@ -599,7 +597,8 @@ func getProposerServer(db db.HeadAccessDatabase, headState state.BeaconState, he TimeFetcher: &testutil.MockGenesisTimeFetcher{ Genesis: time.Now(), }, - ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), + PayloadIDCache: cache.NewPayloadIDCache(), + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), BeaconDB: db, BLSChangesPool: blstoexec.NewPool(), BlockBuilder: &builderTest.MockBuilderService{HasConfigured: true}, @@ -2609,16 +2608,19 @@ func TestProposer_PrepareBeaconProposer(t *testing.T) { t.Run(tt.name, func(t *testing.T) { db := dbutil.SetupDB(t) ctx := context.Background() - proposerServer := &Server{BeaconDB: db} + proposerServer := &Server{ + BeaconDB: db, + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), + } _, err := proposerServer.PrepareBeaconProposer(ctx, tt.args.request) if tt.wantErr != "" { require.ErrorContains(t, tt.wantErr, err) return } require.NoError(t, err) - address, err := proposerServer.BeaconDB.FeeRecipientByValidatorID(ctx, 1) - require.NoError(t, err) - require.Equal(t, common.BytesToAddress(tt.args.request.Recipients[0].FeeRecipient), address) + val, tracked := proposerServer.TrackedValidatorsCache.Validator(1) + require.Equal(t, true, tracked) + require.Equal(t, primitives.ExecutionAddress(tt.args.request.Recipients[0].FeeRecipient), val.FeeRecipient) }) } @@ -2628,7 +2630,10 @@ func TestProposer_PrepareBeaconProposerOverlapping(t *testing.T) { hook := logTest.NewGlobal() db := dbutil.SetupDB(t) ctx := context.Background() - proposerServer := &Server{BeaconDB: db} + proposerServer := &Server{ + BeaconDB: db, + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), + } // New validator f := bytesutil.PadTo([]byte{0xFF, 0x01, 0xFF, 0x01, 0xFF, 0x01, 0xFF, 0x01, 0xFF, 0xFF, 0x01, 0xFF, 0x01, 0xFF, 0x01, 0xFF, 0x01, 0xFF}, fieldparams.FeeRecipientLength) @@ -2645,7 +2650,7 @@ func TestProposer_PrepareBeaconProposerOverlapping(t *testing.T) { hook.Reset() _, err = proposerServer.PrepareBeaconProposer(ctx, req) require.NoError(t, err) - require.LogsDoNotContain(t, hook, "Updated fee recipient addresses for validator indices") + require.LogsContain(t, hook, "Updated fee recipient addresses for validator indices") // Same validator with different fee recipient hook.Reset() @@ -2676,14 +2681,16 @@ func TestProposer_PrepareBeaconProposerOverlapping(t *testing.T) { hook.Reset() _, err = proposerServer.PrepareBeaconProposer(ctx, req) require.NoError(t, err) - require.LogsDoNotContain(t, hook, "Updated fee recipient addresses for validator indices") + require.LogsContain(t, hook, "Updated fee recipient addresses for validator indices") } func BenchmarkServer_PrepareBeaconProposer(b *testing.B) { db := dbutil.SetupDB(b) ctx := context.Background() - proposerServer := &Server{BeaconDB: db} - + proposerServer := &Server{ + BeaconDB: db, + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), + } f := bytesutil.PadTo([]byte{0xFF, 0x01, 0xFF, 0x01, 0xFF, 0x01, 0xFF, 0x01, 0xFF, 0xFF, 0x01, 0xFF, 0x01, 0xFF, 0x01, 0xFF, 0x01, 0xFF}, fieldparams.FeeRecipientLength) recipients := make([]*ethpb.PrepareBeaconProposerRequest_FeeRecipientContainer, 0) for i := 0; i < 10000; i++ { diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/server.go b/beacon-chain/rpc/prysm/v1alpha1/validator/server.go index 16cd7e2bc1..1a33e36b0f 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/server.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/server.go @@ -44,7 +44,8 @@ import ( // and more. type Server struct { Ctx context.Context - ProposerSlotIndexCache *cache.ProposerPayloadIDsCache + PayloadIDCache *cache.PayloadIDCache + TrackedValidatorsCache *cache.TrackedValidatorsCache HeadFetcher blockchain.HeadFetcher ForkFetcher blockchain.ForkFetcher ForkchoiceFetcher blockchain.ForkchoiceFetcher diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index 36061b83d3..524e297807 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -128,12 +128,13 @@ type Config struct { StateGen *stategen.State MaxMsgSize int ExecutionEngineCaller execution.EngineCaller - ProposerIdsCache *cache.ProposerPayloadIDsCache OptimisticModeFetcher blockchain.OptimisticModeFetcher BlockBuilder builder.BlockBuilder Router *mux.Router ClockWaiter startup.ClockWaiter BlobStorage *filesystem.BlobStorage + TrackedValidatorsCache *cache.TrackedValidatorsCache + PayloadIDCache *cache.PayloadIDCache } // NewService instantiates a new RPC service instance that will @@ -401,11 +402,12 @@ func (s *Service) Start() { ReplayerBuilder: ch, ExecutionEngineCaller: s.cfg.ExecutionEngineCaller, BeaconDB: s.cfg.BeaconDB, - ProposerSlotIndexCache: s.cfg.ProposerIdsCache, BlockBuilder: s.cfg.BlockBuilder, BLSChangesPool: s.cfg.BLSChangesPool, ClockWaiter: s.cfg.ClockWaiter, CoreService: coreService, + TrackedValidatorsCache: s.cfg.TrackedValidatorsCache, + PayloadIDCache: s.cfg.PayloadIDCache, } s.initializeValidatorServerRoutes(&validator.Server{ HeadFetcher: s.cfg.HeadFetcher, @@ -418,11 +420,12 @@ func (s *Service) Start() { V1Alpha1Server: validatorServer, Stater: stater, SyncCommitteePool: s.cfg.SyncCommitteeObjectPool, - ProposerSlotIndexCache: s.cfg.ProposerIdsCache, ChainInfoFetcher: s.cfg.ChainInfoFetcher, BeaconDB: s.cfg.BeaconDB, BlockBuilder: s.cfg.BlockBuilder, OperationNotifier: s.cfg.OperationNotifier, + TrackedValidatorsCache: s.cfg.TrackedValidatorsCache, + PayloadIDCache: s.cfg.PayloadIDCache, CoreService: coreService, BlockRewardFetcher: rewardFetcher, }) diff --git a/consensus-types/primitives/BUILD.bazel b/consensus-types/primitives/BUILD.bazel index 14e953e0d1..76d7f59846 100644 --- a/consensus-types/primitives/BUILD.bazel +++ b/consensus-types/primitives/BUILD.bazel @@ -6,6 +6,8 @@ go_library( "committee_index.go", "domain.go", "epoch.go", + "execution_address.go", + "payload_id.go", "randao.go", "slot.go", "sszbytes.go", diff --git a/consensus-types/primitives/execution_address.go b/consensus-types/primitives/execution_address.go new file mode 100644 index 0000000000..166532fcb4 --- /dev/null +++ b/consensus-types/primitives/execution_address.go @@ -0,0 +1,3 @@ +package primitives + +type ExecutionAddress [20]byte diff --git a/consensus-types/primitives/payload_id.go b/consensus-types/primitives/payload_id.go new file mode 100644 index 0000000000..7898f67aae --- /dev/null +++ b/consensus-types/primitives/payload_id.go @@ -0,0 +1,4 @@ +package primitives + +// PayloadID represents an execution engine Payload ID +type PayloadID [8]byte diff --git a/testing/spectest/shared/common/forkchoice/service.go b/testing/spectest/shared/common/forkchoice/service.go index 1c788d227b..f506224d01 100644 --- a/testing/spectest/shared/common/forkchoice/service.go +++ b/testing/spectest/shared/common/forkchoice/service.go @@ -68,7 +68,8 @@ func startChainService(t testing.TB, blockchain.WithStateNotifier(&mock.MockStateNotifier{}), blockchain.WithAttestationPool(attestations.NewPool()), blockchain.WithDepositCache(depositCache), - blockchain.WithProposerIdsCache(cache.NewProposerPayloadIDsCache()), + blockchain.WithTrackedValidatorsCache(cache.NewTrackedValidatorsCache()), + blockchain.WithPayloadIDCache(cache.NewPayloadIDCache()), blockchain.WithClockSynchronizer(startup.NewClockSynchronizer()), blockchain.WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)), ) diff --git a/validator/client/BUILD.bazel b/validator/client/BUILD.bazel index 16205a9ac0..223a3f16a9 100644 --- a/validator/client/BUILD.bazel +++ b/validator/client/BUILD.bazel @@ -32,7 +32,6 @@ go_library( "//beacon-chain/core/altair:go_default_library", "//beacon-chain/core/signing:go_default_library", "//cache/lru:go_default_library", - "//cmd/validator/flags:go_default_library", "//config/features:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", diff --git a/validator/client/runner.go b/validator/client/runner.go index 13857ee78e..080c17f70e 100644 --- a/validator/client/runner.go +++ b/validator/client/runner.go @@ -7,7 +7,6 @@ import ( "time" "github.com/pkg/errors" - "github.com/prysmaticlabs/prysm/v4/cmd/validator/flags" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" @@ -55,25 +54,20 @@ func run(ctx context.Context, v iface.Validator) { sub := km.SubscribeAccountChanges(accountsChangedChan) // check if proposer settings is still nil // Set properties on the beacon node like the fee recipient for validators that are being used & active. - if v.ProposerSettings() != nil { - log.Infof("Validator client started with provided proposer settings that sets options such as fee recipient"+ - " and will periodically update the beacon node and custom builder (if --%s)", flags.EnableBuilderFlag.Name) - deadline := time.Now().Add(5 * time.Minute) - if err := v.PushProposerSettings(ctx, km, headSlot, deadline); err != nil { - if errors.Is(err, ErrBuilderValidatorRegistration) { - log.WithError(err).Warn("Push proposer settings error") - } else { - log.WithError(err).Fatal("Failed to update proposer settings") // allow fatal. skipcq - } - } - } else { + if v.ProposerSettings() == nil { log.Warn("Validator client started without proposer settings such as fee recipient" + " and will continue to use settings provided in the beacon node.") } - + deadline := time.Now().Add(5 * time.Minute) + if err := v.PushProposerSettings(ctx, km, headSlot, deadline); err != nil { + if errors.Is(err, ErrBuilderValidatorRegistration) { + log.WithError(err).Warn("Push proposer settings error") + } else { + log.WithError(err).Fatal("Failed to update proposer settings") // allow fatal. skipcq + } + } for { ctx, span := trace.StartSpan(ctx, "validator.processSlot") - select { case <-ctx.Done(): log.Info("Context canceled, stopping validator") @@ -108,7 +102,7 @@ func run(ctx context.Context, v iface.Validator) { // call push proposer setting at the start of each epoch to account for the following edge case: // proposer is activated at the start of epoch and tries to propose immediately - if slots.IsEpochStart(slot) && v.ProposerSettings() != nil { + if slots.IsEpochStart(slot) { go func() { // deadline set for 1 epoch from call to not overlap. epochDeadline := v.SlotDeadline(slot + params.BeaconConfig().SlotsPerEpoch - 1) diff --git a/validator/client/validator.go b/validator/client/validator.go index 04f6793e7c..8034f9170b 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -1028,14 +1028,10 @@ func (v *validator) PushProposerSettings(ctx context.Context, km keymanager.IKey return err } - signedRegReqs, err := v.buildSignedRegReqs(ctx, filteredKeys, km.Sign) - if err != nil { - return err - } + signedRegReqs := v.buildSignedRegReqs(ctx, filteredKeys, km.Sign) if err := SubmitValidatorRegistrations(ctx, v.validatorClient, signedRegReqs, v.validatorRegBatchSize); err != nil { return errors.Wrap(ErrBuilderValidatorRegistration, err.Error()) } - return nil } @@ -1089,12 +1085,10 @@ func (v *validator) buildPrepProposerReqs(ctx context.Context, pubkeys [][fieldp for _, k := range pubkeys { // Default case: Define fee recipient to burn address var feeRecipient common.Address - isFeeRecipientDefined := false // If fee recipient is defined in default configuration, use it if v.ProposerSettings() != nil && v.ProposerSettings().DefaultConfig != nil && v.ProposerSettings().DefaultConfig.FeeRecipientConfig != nil { feeRecipient = v.ProposerSettings().DefaultConfig.FeeRecipientConfig.FeeRecipient // Use cli config for fee recipient. - isFeeRecipientDefined = true } // If fee recipient is defined for this specific pubkey in proposer configuration, use it @@ -1103,7 +1097,6 @@ func (v *validator) buildPrepProposerReqs(ctx context.Context, pubkeys [][fieldp if ok && config != nil && config.FeeRecipientConfig != nil { feeRecipient = config.FeeRecipientConfig.FeeRecipient // Use file config for fee recipient. - isFeeRecipientDefined = true } } @@ -1112,29 +1105,22 @@ func (v *validator) buildPrepProposerReqs(ctx context.Context, pubkeys [][fieldp continue } - if isFeeRecipientDefined { - prepareProposerReqs = append(prepareProposerReqs, ðpb.PrepareBeaconProposerRequest_FeeRecipientContainer{ - ValidatorIndex: validatorIndex, - FeeRecipient: feeRecipient[:], - }) - - if hexutil.Encode(feeRecipient.Bytes()) == params.BeaconConfig().EthBurnAddressHex { - log.WithFields(logrus.Fields{ - "validatorIndex": validatorIndex, - "feeRecipient": feeRecipient, - }).Warn("Fee recipient is burn address") - } - } + prepareProposerReqs = append(prepareProposerReqs, ðpb.PrepareBeaconProposerRequest_FeeRecipientContainer{ + ValidatorIndex: validatorIndex, + FeeRecipient: feeRecipient[:], + }) } return prepareProposerReqs, nil } -func (v *validator) buildSignedRegReqs(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte /* only active pubkeys */, signer iface.SigningFunc) ([]*ethpb.SignedValidatorRegistrationV1, error) { +func (v *validator) buildSignedRegReqs(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte /* only active pubkeys */, signer iface.SigningFunc) []*ethpb.SignedValidatorRegistrationV1 { var signedValRegRegs []*ethpb.SignedValidatorRegistrationV1 - + if v.ProposerSettings() == nil { + return signedValRegRegs + } // if the timestamp is pre-genesis, don't create registrations if v.genesisTime > uint64(time.Now().UTC().Unix()) { - return signedValRegRegs, nil + return signedValRegRegs } for i, k := range pubkeys { feeRecipient := common.HexToAddress(params.BeaconConfig().EthBurnAddressHex) @@ -1203,7 +1189,7 @@ func (v *validator) buildSignedRegReqs(ctx context.Context, pubkeys [][fieldpara }).Warn("Fee recipient is burn address") } } - return signedValRegRegs, nil + return signedValRegRegs } func (v *validator) validatorIndex(ctx context.Context, pubkey [fieldparams.BLSPubkeyLength]byte) (primitives.ValidatorIndex, bool, error) { diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index 375d149363..8ea9778480 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -1862,8 +1862,7 @@ func TestValidator_PushProposerSettings(t *testing.T) { if tt.feeRecipientMap != nil { feeRecipients, err := v.buildPrepProposerReqs(ctx, pubkeys) require.NoError(t, err) - signedRegisterValidatorRequests, err := v.buildSignedRegReqs(ctx, pubkeys, km.Sign) - require.NoError(t, err) + signedRegisterValidatorRequests := v.buildSignedRegReqs(ctx, pubkeys, km.Sign) for _, recipient := range feeRecipients { require.Equal(t, strings.ToLower(tt.feeRecipientMap[recipient.ValidatorIndex]), strings.ToLower(hexutil.Encode(recipient.FeeRecipient))) } @@ -1935,6 +1934,7 @@ func TestValidator_buildPrepProposerReqs_WithoutDefaultConfig(t *testing.T) { feeRecipient1 := getFeeRecipientFromString(t, "0x1111111111111111111111111111111111111111") feeRecipient2 := getFeeRecipientFromString(t, "0x0000000000000000000000000000000000000000") feeRecipient3 := getFeeRecipientFromString(t, "0x3333333333333333333333333333333333333333") + feeRecipient4 := common.Address{} ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1984,6 +1984,11 @@ func TestValidator_buildPrepProposerReqs_WithoutDefaultConfig(t *testing.T) { FeeRecipient: feeRecipient3, }, }, + pubkey4: { + FeeRecipientConfig: &validatorserviceconfig.FeeRecipientConfig{ + FeeRecipient: feeRecipient4, + }, + }, }, }, pubkeyToValidatorIndex: map[[48]byte]primitives.ValidatorIndex{ @@ -2003,6 +2008,10 @@ func TestValidator_buildPrepProposerReqs_WithoutDefaultConfig(t *testing.T) { ValidatorIndex: 2, FeeRecipient: feeRecipient2[:], }, + { + ValidatorIndex: 4, + FeeRecipient: feeRecipient4[:], + }, } filteredKeys, err := v.filterAndCacheActiveKeys(ctx, pubkeys, 0) require.NoError(t, err) @@ -2256,8 +2265,7 @@ func TestValidator_buildSignedRegReqs_DefaultConfigDisabled(t *testing.T) { v.pubkeyToValidatorIndex[pubkey1] = primitives.ValidatorIndex(1) v.pubkeyToValidatorIndex[pubkey2] = primitives.ValidatorIndex(2) v.pubkeyToValidatorIndex[pubkey3] = primitives.ValidatorIndex(3) - actual, err := v.buildSignedRegReqs(ctx, pubkeys, signer) - require.NoError(t, err) + actual := v.buildSignedRegReqs(ctx, pubkeys, signer) assert.Equal(t, 1, len(actual)) assert.DeepEqual(t, feeRecipient1[:], actual[0].Message.FeeRecipient) @@ -2342,8 +2350,7 @@ func TestValidator_buildSignedRegReqs_DefaultConfigEnabled(t *testing.T) { v.pubkeyToValidatorIndex[pubkey1] = primitives.ValidatorIndex(1) v.pubkeyToValidatorIndex[pubkey2] = primitives.ValidatorIndex(2) v.pubkeyToValidatorIndex[pubkey3] = primitives.ValidatorIndex(3) - actual, err := v.buildSignedRegReqs(ctx, pubkeys, signer) - require.NoError(t, err) + actual := v.buildSignedRegReqs(ctx, pubkeys, signer) assert.Equal(t, 2, len(actual)) @@ -2391,9 +2398,7 @@ func TestValidator_buildSignedRegReqs_SignerOnError(t *testing.T) { return nil, errors.New("custom error") } - actual, err := v.buildSignedRegReqs(ctx, pubkeys, signer) - require.NoError(t, err) - + actual := v.buildSignedRegReqs(ctx, pubkeys, signer) assert.Equal(t, 0, len(actual)) } @@ -2449,8 +2454,6 @@ func TestValidator_buildSignedRegReqs_TimestampBeforeGenesis(t *testing.T) { return signature, nil } v.pubkeyToValidatorIndex[pubkey1] = primitives.ValidatorIndex(1) - actual, err := v.buildSignedRegReqs(ctx, pubkeys, signer) - require.NoError(t, err) - + actual := v.buildSignedRegReqs(ctx, pubkeys, signer) assert.Equal(t, 0, len(actual)) }