Compare commits

...

12 Commits

Author SHA1 Message Date
terence tsao
297e77bf31 Move fire payload attribute event to after save head 2025-03-06 13:23:39 -08:00
kasey
773bf3ffbf Merge branch 'develop' into event-slot-fix 2025-03-05 15:28:17 -06:00
Kasey Kirkham
33f362305a Radek feedback 2025-03-05 15:27:19 -06:00
kasey
217ac4f6eb Apply suggestions from code review
Radek's PR suggestion to fix error/log capitalization.

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2025-03-05 15:20:42 -06:00
Kasey Kirkham
3d1b760b2c nil check the Reader coming from the lazyReader 2025-03-03 14:56:01 -06:00
james-prysm
c14caa947f fixing ineffectual assignment 2025-03-03 14:00:53 -06:00
james-prysm
0aaaec351f gaz 2025-03-03 13:42:26 -06:00
james-prysm
0440b96cb3 Merge branch 'develop' into event-slot-fix 2025-03-03 13:36:48 -06:00
james-prysm
8aa03469d3 adding in clarifying comments, uncommenting needs fill, adding in happy path unit test for code coverage 2025-03-03 13:36:29 -06:00
james-prysm
780c6a6b3b Merge branch 'develop' into event-slot-fix 2025-02-28 09:11:04 -06:00
Kasey Kirkham
342c23a56a Skip executing nil lazyReaders 2025-02-24 15:52:25 -06:00
Kasey Kirkham
bc62d17862 payload attribute computations in event handler 2025-02-19 15:54:01 -06:00
7 changed files with 210 additions and 93 deletions

View File

@@ -72,7 +72,6 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (*
if arg.attributes == nil {
arg.attributes = payloadattribute.EmptyWithVersion(headBlk.Version())
}
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), arg)
payloadID, lastValidHash, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, arg.attributes)
if err != nil {
switch {
@@ -159,6 +158,7 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (*
log.WithFields(logrus.Fields{
"blockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(arg.headRoot[:])),
"headSlot": headBlk.Slot(),
"nextSlot": nextSlot,
"payloadID": fmt.Sprintf("%#x", bytesutil.Trunc(payloadID[:])),
}).Info("Forkchoice updated with payload attributes for proposal")
s.cfg.PayloadIDCache.Set(nextSlot, arg.headRoot, pId)
@@ -166,40 +166,19 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (*
log.WithFields(logrus.Fields{
"blockHash": fmt.Sprintf("%#x", headPayload.BlockHash()),
"slot": headBlk.Slot(),
"nextSlot": nextSlot,
}).Error("Received nil payload ID on VALID engine response")
}
return payloadID, nil
}
func firePayloadAttributesEvent(ctx context.Context, f event.SubscriberSender, cfg *fcuConfig) {
pidx, err := helpers.BeaconProposerIndex(ctx, cfg.headState)
if err != nil {
log.WithError(err).
WithField("head_root", cfg.headRoot[:]).
Error("Could not get proposer index for PayloadAttributes event")
return
}
evd := payloadattribute.EventData{
ProposerIndex: pidx,
ProposalSlot: cfg.headState.Slot(),
ParentBlockRoot: cfg.headRoot[:],
Attributer: cfg.attributes,
HeadRoot: cfg.headRoot,
HeadState: cfg.headState,
HeadBlock: cfg.headBlock,
}
if cfg.headBlock != nil && !cfg.headBlock.IsNil() {
headPayload, err := cfg.headBlock.Block().Body().Execution()
if err != nil {
log.WithError(err).Error("Could not get execution payload for head block")
return
}
evd.ParentBlockHash = headPayload.BlockHash()
evd.ParentBlockNumber = headPayload.BlockNumber()
}
func firePayloadAttributesEvent(_ context.Context, f event.SubscriberSender, nextSlot primitives.Slot) {
// the fcu args have differing amounts of completeness based on the code path,
// and there is work we only want to do if a client is actually listening to the events beacon api endpoint.
// temporary solution: just fire a blank event and fill in the details in the api handler.
f.Send(&feed.Event{
Type: statefeed.PayloadAttributes,
Data: evd,
Data: payloadattribute.EventData{ProposalSlot: nextSlot},
})
}

View File

@@ -102,6 +102,8 @@ func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuCo
log.WithError(err).Error("could not save head")
}
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1)
// Only need to prune attestations from pool if the head has changed.
if err := s.pruneAttsFromPool(s.ctx, args.headState, args.headBlock); err != nil {
log.WithError(err).Error("could not prune attestations from pool")

View File

@@ -723,13 +723,9 @@ func (s *Service) lateBlockTasks(ctx context.Context) {
attribute := s.getPayloadAttribute(ctx, headState, s.CurrentSlot()+1, headRoot[:])
// return early if we are not proposing next slot
if attribute.IsEmpty() {
fcuArgs := &fcuConfig{
headState: headState,
headRoot: headRoot,
headBlock: nil,
attributes: attribute,
}
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), fcuArgs)
// notifyForkchoiceUpdate fires the payload attribute event. But in this case, we won't
// call notifyForkchoiceUpdate, so the event is fired here.
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1)
return
}

View File

@@ -18,7 +18,8 @@ go_library(
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/state:go_default_library",
"//config/params:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/payload-attribute:go_default_library",
@@ -58,6 +59,7 @@ go_test(
"//consensus-types/interfaces:go_default_library",
"//consensus-types/payload-attribute:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/require:go_default_library",

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"runtime/debug"
"strconv"
"time"
@@ -20,7 +21,8 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
chaintime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
payloadattribute "github.com/prysmaticlabs/prysm/v5/consensus-types/payload-attribute"
@@ -352,9 +354,18 @@ func writeLazyReaderWithRecover(w *streamingResponseWriterController, lr lazyRea
if r := recover(); r != nil {
log.WithField("panic", r).Error("Recovered from panic while writing event to client.")
err = errWriterUnusable
debug.PrintStack()
}
}()
if lr == nil {
log.Warn("Event stream skipping a nil lazy event reader callback")
return nil
}
r := lr()
if r == nil {
log.Warn("Event stream skipping a nil event reader")
return nil
}
out, err := io.ReadAll(r)
if err != nil {
return err
@@ -600,27 +611,14 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi
var errUnsupportedPayloadAttribute = errors.New("cannot compute payload attributes pre-Bellatrix")
func (s *Server) computePayloadAttributes(ctx context.Context, ev payloadattribute.EventData) (payloadattribute.Attributer, error) {
v := ev.HeadState.Version()
func (s *Server) computePayloadAttributes(ctx context.Context, st state.ReadOnlyBeaconState, root [32]byte, proposer primitives.ValidatorIndex, timestamp uint64, randao []byte) (payloadattribute.Attributer, error) {
v := st.Version()
if v < version.Bellatrix {
return nil, errors.Wrapf(errUnsupportedPayloadAttribute, "%s is not supported", version.String(v))
}
t, err := slots.ToTime(ev.HeadState.GenesisTime(), ev.HeadState.Slot())
if err != nil {
return nil, errors.Wrap(err, "could not get head state slot time")
}
timestamp := uint64(t.Unix())
prevRando, err := helpers.RandaoMix(ev.HeadState, chaintime.CurrentEpoch(ev.HeadState))
if err != nil {
return nil, errors.Wrap(err, "could not get head state randao mix")
}
proposerIndex, err := helpers.BeaconProposerIndex(ctx, ev.HeadState)
if err != nil {
return nil, errors.Wrap(err, "could not get head state proposer index")
}
feeRecpt := params.BeaconConfig().DefaultFeeRecipient.Bytes()
tValidator, exists := s.TrackedValidatorsCache.Validator(proposerIndex)
tValidator, exists := s.TrackedValidatorsCache.Validator(proposer)
if exists {
feeRecpt = tValidator.FeeRecipient[:]
}
@@ -628,34 +626,30 @@ func (s *Server) computePayloadAttributes(ctx context.Context, ev payloadattribu
if v == version.Bellatrix {
return payloadattribute.New(&engine.PayloadAttributes{
Timestamp: timestamp,
PrevRandao: prevRando,
PrevRandao: randao,
SuggestedFeeRecipient: feeRecpt,
})
}
w, _, err := ev.HeadState.ExpectedWithdrawals()
w, _, err := st.ExpectedWithdrawals()
if err != nil {
return nil, errors.Wrap(err, "could not get withdrawals from head state")
}
if v == version.Capella {
return payloadattribute.New(&engine.PayloadAttributesV2{
Timestamp: timestamp,
PrevRandao: prevRando,
PrevRandao: randao,
SuggestedFeeRecipient: feeRecpt,
Withdrawals: w,
})
}
pr, err := ev.HeadBlock.Block().HashTreeRoot()
if err != nil {
return nil, errors.Wrap(err, "could not compute head block root")
}
return payloadattribute.New(&engine.PayloadAttributesV3{
Timestamp: timestamp,
PrevRandao: prevRando,
PrevRandao: randao,
SuggestedFeeRecipient: feeRecpt,
Withdrawals: w,
ParentBeaconBlockRoot: pr[:],
ParentBeaconBlockRoot: root[:],
})
}
@@ -665,37 +659,75 @@ type asyncPayloadAttrData struct {
err error
}
var zeroRoot [32]byte
// needsFill allows tests to provide filled EventData values. An ordinary event data value fired by the blockchain package will have
// all of the checked fields empty, so the logical short circuit should hit immediately.
func needsFill(ev payloadattribute.EventData) bool {
return ev.HeadState == nil || ev.HeadState.IsNil() || ev.HeadState.LatestBlockHeader() == nil ||
ev.HeadBlock == nil || ev.HeadBlock.IsNil() ||
ev.HeadRoot == zeroRoot || len(ev.ParentBlockRoot) == 0 || len(ev.ParentBlockHash) == 0 ||
ev.Attributer == nil || ev.Attributer.IsEmpty()
}
func (s *Server) fillEventData(ctx context.Context, ev payloadattribute.EventData) (payloadattribute.EventData, error) {
if ev.HeadBlock == nil || ev.HeadBlock.IsNil() {
hb, err := s.HeadFetcher.HeadBlock(ctx)
if err != nil {
return ev, errors.Wrap(err, "Could not look up head block")
}
root, err := hb.Block().HashTreeRoot()
if err != nil {
return ev, errors.Wrap(err, "Could not compute head block root")
}
if ev.HeadRoot != root {
return ev, errors.Wrap(err, "head root changed before payload attribute event handler execution")
}
ev.HeadBlock = hb
payload, err := hb.Block().Body().Execution()
if err != nil {
return ev, errors.Wrap(err, "Could not get execution payload for head block")
}
ev.ParentBlockHash = payload.BlockHash()
ev.ParentBlockNumber = payload.BlockNumber()
var err error
if !needsFill(ev) {
return ev, nil
}
attr := ev.Attributer
if attr == nil || attr.IsEmpty() {
attr, err := s.computePayloadAttributes(ctx, ev)
if err != nil {
return ev, errors.Wrap(err, "Could not compute payload attributes")
}
ev.Attributer = attr
ev.HeadState, err = s.HeadFetcher.HeadState(ctx)
if err != nil {
return ev, errors.Wrap(err, "could not get head state")
}
return ev, nil
ev.HeadBlock, err = s.HeadFetcher.HeadBlock(ctx)
if err != nil {
return ev, errors.Wrap(err, "could not look up head block")
}
ev.HeadRoot, err = ev.HeadBlock.Block().HashTreeRoot()
if err != nil {
return ev, errors.Wrap(err, "could not compute head block root")
}
pr := ev.HeadBlock.Block().ParentRoot()
ev.ParentBlockRoot = pr[:]
hsr, err := ev.HeadState.LatestBlockHeader().HashTreeRoot()
if err != nil {
return ev, errors.Wrap(err, "could not compute latest block header root")
}
pse := slots.ToEpoch(ev.ProposalSlot)
st := ev.HeadState
if slots.ToEpoch(st.Slot()) != pse {
st, err = transition.ProcessSlotsUsingNextSlotCache(ctx, st, hsr[:], ev.ProposalSlot)
if err != nil {
return ev, errors.Wrap(err, "could not run process blocks on head state into the proposal slot epoch")
}
}
ev.ProposerIndex, err = helpers.BeaconProposerIndexAtSlot(ctx, st, ev.ProposalSlot)
if err != nil {
return ev, errors.Wrap(err, "failed to compute proposer index")
}
randao, err := helpers.RandaoMix(st, pse)
if err != nil {
return ev, errors.Wrap(err, "could not get head state randado")
}
payload, err := ev.HeadBlock.Block().Body().Execution()
if err != nil {
return ev, errors.Wrap(err, "could not get execution payload for head block")
}
ev.ParentBlockHash = payload.BlockHash()
ev.ParentBlockNumber = payload.BlockNumber()
t, err := slots.ToTime(st.GenesisTime(), ev.ProposalSlot)
if err != nil {
return ev, errors.Wrap(err, "could not get head state slot time")
}
ev.Attributer, err = s.computePayloadAttributes(ctx, st, hsr, ev.ProposerIndex, uint64(t.Unix()), randao)
return ev, err
}
// This event stream is intended to be used by builders and relays.
@@ -704,10 +736,7 @@ func (s *Server) payloadAttributesReader(ctx context.Context, ev payloadattribut
ctx, cancel := context.WithTimeout(ctx, payloadAttributeTimeout)
edc := make(chan asyncPayloadAttrData)
go func() {
d := asyncPayloadAttrData{
version: version.String(ev.HeadState.Version()),
}
d := asyncPayloadAttrData{}
defer func() {
edc <- d
}()
@@ -716,6 +745,7 @@ func (s *Server) payloadAttributesReader(ctx context.Context, ev payloadattribut
d.err = errors.Wrap(err, "Could not fill event data")
return
}
d.version = version.String(ev.HeadBlock.Version())
attributesBytes, err := marshalAttributes(ev.Attributer)
if err != nil {
d.err = errors.Wrap(err, "errors marshaling payload attributes to json")

View File

@@ -2,6 +2,7 @@ package events
import (
"context"
"encoding/binary"
"fmt"
"io"
"math"
@@ -24,6 +25,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
payloadattribute "github.com/prysmaticlabs/prysm/v5/consensus-types/payload-attribute"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/eth/v1"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/require"
@@ -557,6 +559,110 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
})
}
func TestFillEventData(t *testing.T) {
ctx := context.Background()
t.Run("AlreadyFilledData_ShouldShortCircuitWithoutError", func(t *testing.T) {
st, err := util.NewBeaconStateBellatrix()
require.NoError(t, err)
b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlockBellatrix(&eth.SignedBeaconBlockBellatrix{}))
require.NoError(t, err)
attributor, err := payloadattribute.New(&enginev1.PayloadAttributes{
Timestamp: uint64(time.Now().Unix()),
})
require.NoError(t, err)
alreadyFilled := payloadattribute.EventData{
HeadState: st,
HeadBlock: b,
HeadRoot: [32]byte{1, 2, 3},
Attributer: attributor,
ParentBlockRoot: []byte{1, 2, 3},
ParentBlockHash: []byte{4, 5, 6},
}
srv := &Server{} // No real HeadFetcher needed here since it won't be called.
result, err := srv.fillEventData(ctx, alreadyFilled)
require.NoError(t, err)
require.DeepEqual(t, alreadyFilled, result)
})
t.Run("Electra PartialData_ShouldFetchHeadStateAndBlock", func(t *testing.T) {
st, err := util.NewBeaconStateElectra()
require.NoError(t, err)
valCount := 10
setActiveValidators(t, st, valCount)
inactivityScores := make([]uint64, valCount)
for i := range inactivityScores {
inactivityScores[i] = 10
}
require.NoError(t, st.SetInactivityScores(inactivityScores))
b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlockElectra(&eth.SignedBeaconBlockElectra{}))
require.NoError(t, err)
attributor, err := payloadattribute.New(&enginev1.PayloadAttributes{
Timestamp: uint64(time.Now().Unix()),
})
require.NoError(t, err)
// Create an event data object missing certain fields:
partial := payloadattribute.EventData{
// The presence of a nil HeadState, nil HeadBlock, zeroed HeadRoot, etc.
// will cause fillEventData to try to fill the values.
ProposalSlot: 42, // different epoch from current slot
Attributer: attributor, // Must be Bellatrix or later
}
currentSlot := primitives.Slot(0)
// to avoid slot processing
require.NoError(t, st.SetSlot(currentSlot+1))
mockChainService := &mockChain.ChainService{
Root: make([]byte, 32),
State: st,
Block: b,
Slot: &currentSlot,
}
stn := mockChain.NewEventFeedWrapper()
opn := mockChain.NewEventFeedWrapper()
srv := &Server{
StateNotifier: &mockChain.SimpleNotifier{Feed: stn},
OperationNotifier: &mockChain.SimpleNotifier{Feed: opn},
HeadFetcher: mockChainService,
ChainInfoFetcher: mockChainService,
TrackedValidatorsCache: cache.NewTrackedValidatorsCache(),
EventWriteTimeout: testEventWriteTimeout,
}
filled, err := srv.fillEventData(ctx, partial)
require.NoError(t, err, "expected successful fill of partial event data")
// Verify that fields have been updated from the mock data:
require.NotNil(t, filled.HeadState, "HeadState should be assigned")
require.NotNil(t, filled.HeadBlock, "HeadBlock should be assigned")
require.NotEqual(t, [32]byte{}, filled.HeadRoot, "HeadRoot should no longer be zero")
require.NotEmpty(t, filled.ParentBlockRoot, "ParentBlockRoot should be filled")
require.NotEmpty(t, filled.ParentBlockHash, "ParentBlockHash should be filled")
require.Equal(t, uint64(0), filled.ParentBlockNumber, "ParentBlockNumber must match mock block")
// Check that a valid Attributer was set:
require.NotNil(t, filled.Attributer, "Should have a valid payload attributes object")
require.Equal(t, false, filled.Attributer.IsEmpty(), "Attributer should not be empty after fill")
})
}
func setActiveValidators(t *testing.T, st state.BeaconState, count int) {
balances := make([]uint64, count)
validators := make([]*eth.Validator, 0, count)
for i := 0; i < count; i++ {
pubKey := make([]byte, params.BeaconConfig().BLSPubkeyLength)
binary.LittleEndian.PutUint64(pubKey, uint64(i))
balances[i] = uint64(i)
validators = append(validators, &eth.Validator{
PublicKey: pubKey,
ActivationEpoch: 0,
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
WithdrawalCredentials: make([]byte, 32),
})
}
require.NoError(t, st.SetValidators(validators))
require.NoError(t, st.SetBalances(balances))
}
func TestStuckReaderScenarios(t *testing.T) {
cases := []struct {
name string

View File

@@ -0,0 +1,2 @@
### Fixed
- Fixed a bug in the event stream handler when processing payload attribute events where the timestamp and slot of the event would be based on the head rather than the current slot.