include block in attr event and use stategen (#15213)

* include block in attr event and use stategen

* use no-copy state cache for proposal in same epoch

* only advance to the start of epoch

---------

Co-authored-by: Kasey <kasey@users.noreply.github.com>
This commit is contained in:
kasey
2025-04-25 10:11:38 -05:00
committed by GitHub
parent f7eddedd1d
commit c223957751
12 changed files with 61 additions and 51 deletions

View File

@@ -184,13 +184,13 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (*
return payloadID, nil return payloadID, nil
} }
func firePayloadAttributesEvent(_ context.Context, f event.SubscriberSender, nextSlot primitives.Slot) { func firePayloadAttributesEvent(f event.SubscriberSender, block interfaces.ReadOnlySignedBeaconBlock, root [32]byte, nextSlot primitives.Slot) {
// the fcu args have differing amounts of completeness based on the code path, // the fcu args have differing amounts of completeness based on the code path,
// and there is work we only want to do if a client is actually listening to the events beacon api endpoint. // and there is work we only want to do if a client is actually listening to the events beacon api endpoint.
// temporary solution: just fire a blank event and fill in the details in the api handler. // temporary solution: just fire a blank event and fill in the details in the api handler.
f.Send(&feed.Event{ f.Send(&feed.Event{
Type: statefeed.PayloadAttributes, Type: statefeed.PayloadAttributes,
Data: payloadattribute.EventData{ProposalSlot: nextSlot}, Data: payloadattribute.EventData{HeadBlock: block, HeadRoot: root, ProposalSlot: nextSlot},
}) })
} }

View File

@@ -102,7 +102,7 @@ func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuCo
log.WithError(err).Error("could not save head") log.WithError(err).Error("could not save head")
} }
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1) go firePayloadAttributesEvent(s.cfg.StateNotifier.StateFeed(), args.headBlock, args.headRoot, s.CurrentSlot()+1)
// Only need to prune attestations from pool if the head has changed. // Only need to prune attestations from pool if the head has changed.
s.pruneAttsFromPool(s.ctx, args.headState, args.headBlock) s.pruneAttsFromPool(s.ctx, args.headState, args.headBlock)

View File

@@ -729,9 +729,13 @@ func (s *Service) lateBlockTasks(ctx context.Context) {
attribute := s.getPayloadAttribute(ctx, headState, s.CurrentSlot()+1, headRoot[:]) attribute := s.getPayloadAttribute(ctx, headState, s.CurrentSlot()+1, headRoot[:])
// return early if we are not proposing next slot // return early if we are not proposing next slot
if attribute.IsEmpty() { if attribute.IsEmpty() {
headBlock, err := s.headBlock()
if err != nil {
log.WithError(err).WithField("head_root", headRoot).Error("unable to retrieve head block to fire payload attributes event")
}
// notifyForkchoiceUpdate fires the payload attribute event. But in this case, we won't // notifyForkchoiceUpdate fires the payload attribute event. But in this case, we won't
// call notifyForkchoiceUpdate, so the event is fired here. // call notifyForkchoiceUpdate, so the event is fired here.
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1) go firePayloadAttributesEvent(s.cfg.StateNotifier.StateFeed(), headBlock, headRoot, s.CurrentSlot()+1)
return return
} }

View File

@@ -1041,6 +1041,7 @@ func (s *Service) eventsEndpoints() []endpoint {
HeadFetcher: s.cfg.HeadFetcher, HeadFetcher: s.cfg.HeadFetcher,
ChainInfoFetcher: s.cfg.ChainInfoFetcher, ChainInfoFetcher: s.cfg.ChainInfoFetcher,
TrackedValidatorsCache: s.cfg.TrackedValidatorsCache, TrackedValidatorsCache: s.cfg.TrackedValidatorsCache,
StateGen: s.cfg.StateGen,
} }
const namespace = "events" const namespace = "events"

View File

@@ -20,6 +20,7 @@ go_library(
"//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/transition:go_default_library", "//beacon-chain/core/transition:go_default_library",
"//beacon-chain/state:go_default_library", "//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//config/params:go_default_library", "//config/params:go_default_library",
"//consensus-types/interfaces:go_default_library", "//consensus-types/interfaces:go_default_library",
"//consensus-types/payload-attribute:go_default_library", "//consensus-types/payload-attribute:go_default_library",
@@ -53,6 +54,7 @@ go_test(
"//beacon-chain/core/feed/operation:go_default_library", "//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/state:go_default_library", "//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen/mock:go_default_library",
"//config/fieldparams:go_default_library", "//config/fieldparams:go_default_library",
"//config/params:go_default_library", "//config/params:go_default_library",
"//consensus-types/blocks:go_default_library", "//consensus-types/blocks:go_default_library",

View File

@@ -681,48 +681,48 @@ var zeroRoot [32]byte
// needsFill allows tests to provide filled EventData values. An ordinary event data value fired by the blockchain package will have // needsFill allows tests to provide filled EventData values. An ordinary event data value fired by the blockchain package will have
// all of the checked fields empty, so the logical short circuit should hit immediately. // all of the checked fields empty, so the logical short circuit should hit immediately.
func needsFill(ev payloadattribute.EventData) bool { func needsFill(ev payloadattribute.EventData) bool {
return ev.HeadState == nil || ev.HeadState.IsNil() || ev.HeadState.LatestBlockHeader() == nil || return len(ev.ParentBlockHash) == 0 ||
ev.HeadBlock == nil || ev.HeadBlock.IsNil() ||
ev.HeadRoot == zeroRoot || len(ev.ParentBlockRoot) == 0 || len(ev.ParentBlockHash) == 0 ||
ev.Attributer == nil || ev.Attributer.IsEmpty() ev.Attributer == nil || ev.Attributer.IsEmpty()
} }
func (s *Server) fillEventData(ctx context.Context, ev payloadattribute.EventData) (payloadattribute.EventData, error) { func (s *Server) fillEventData(ctx context.Context, ev payloadattribute.EventData) (payloadattribute.EventData, error) {
var err error
if !needsFill(ev) { if !needsFill(ev) {
return ev, nil return ev, nil
} }
if ev.HeadBlock == nil || ev.HeadBlock.IsNil() {
ev.HeadState, err = s.HeadFetcher.HeadState(ctx) return ev, errors.New("head block is nil")
if err != nil { }
return ev, errors.Wrap(err, "could not get head state") if ev.HeadRoot == zeroRoot {
return ev, errors.New("head root is empty")
} }
ev.HeadBlock, err = s.HeadFetcher.HeadBlock(ctx) var err error
if err != nil { var st state.BeaconState
return ev, errors.Wrap(err, "could not look up head block")
}
ev.HeadRoot, err = ev.HeadBlock.Block().HashTreeRoot()
if err != nil {
return ev, errors.Wrap(err, "could not compute head block root")
}
pr := ev.HeadBlock.Block().ParentRoot()
ev.ParentBlockRoot = pr[:]
hsr, err := ev.HeadState.LatestBlockHeader().HashTreeRoot()
if err != nil {
return ev, errors.Wrap(err, "could not compute latest block header root")
}
// If head is in the same block as the proposal slot, we can use the "read only" state cache.
pse := slots.ToEpoch(ev.ProposalSlot) pse := slots.ToEpoch(ev.ProposalSlot)
st := ev.HeadState if slots.ToEpoch(ev.HeadBlock.Block().Slot()) == pse {
if slots.ToEpoch(st.Slot()) != pse { st = s.StateGen.StateByRootIfCachedNoCopy(ev.HeadRoot)
st, err = transition.ProcessSlotsUsingNextSlotCache(ctx, st, hsr[:], ev.ProposalSlot) }
// If st is nil, we couldn't get the state from the cache, or it isn't in the same epoch.
if st == nil || st.IsNil() {
st, err = s.StateGen.StateByRoot(ctx, ev.HeadRoot)
if err != nil { if err != nil {
return ev, errors.Wrap(err, "could not run process blocks on head state into the proposal slot epoch") return ev, errors.Wrap(err, "could not get head state")
}
// double check that we need to process_slots, just in case we got here via a hot state cache miss.
if slots.ToEpoch(st.Slot()) == pse {
start, err := slots.EpochStart(pse)
if err != nil {
return ev, errors.Wrap(err, "invalid state slot; could not compute epoch start")
}
st, err = transition.ProcessSlotsUsingNextSlotCache(ctx, st, ev.HeadRoot[:], start)
if err != nil {
return ev, errors.Wrap(err, "could not run process blocks on head state into the proposal slot epoch")
}
} }
} }
ev.ProposerIndex, err = helpers.BeaconProposerIndexAtSlot(ctx, st, ev.ProposalSlot) ev.ProposerIndex, err = helpers.BeaconProposerIndexAtSlot(ctx, st, ev.ProposalSlot)
if err != nil { if err != nil {
return ev, errors.Wrap(err, "failed to compute proposer index") return ev, errors.Wrap(err, "failed to compute proposer index")
@@ -743,7 +743,7 @@ func (s *Server) fillEventData(ctx context.Context, ev payloadattribute.EventDat
if err != nil { if err != nil {
return ev, errors.Wrap(err, "could not get head state slot time") return ev, errors.Wrap(err, "could not get head state slot time")
} }
ev.Attributer, err = s.computePayloadAttributes(ctx, st, hsr, ev.ProposerIndex, uint64(t.Unix()), randao) ev.Attributer, err = s.computePayloadAttributes(ctx, st, ev.HeadRoot, ev.ProposerIndex, uint64(t.Unix()), randao)
return ev, err return ev, err
} }
@@ -772,7 +772,7 @@ func (s *Server) payloadAttributesReader(ctx context.Context, ev payloadattribut
ProposerIndex: strconv.FormatUint(uint64(ev.ProposerIndex), 10), ProposerIndex: strconv.FormatUint(uint64(ev.ProposerIndex), 10),
ProposalSlot: strconv.FormatUint(uint64(ev.ProposalSlot), 10), ProposalSlot: strconv.FormatUint(uint64(ev.ProposalSlot), 10),
ParentBlockNumber: strconv.FormatUint(ev.ParentBlockNumber, 10), ParentBlockNumber: strconv.FormatUint(ev.ParentBlockNumber, 10),
ParentBlockRoot: hexutil.Encode(ev.ParentBlockRoot), ParentBlockRoot: hexutil.Encode(ev.HeadRoot[:]),
ParentBlockHash: hexutil.Encode(ev.ParentBlockHash), ParentBlockHash: hexutil.Encode(ev.ParentBlockHash),
PayloadAttributes: attributesBytes, PayloadAttributes: attributesBytes,
}) })

View File

@@ -18,6 +18,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation"
statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state" statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state"
"github.com/OffchainLabs/prysm/v6/beacon-chain/state" "github.com/OffchainLabs/prysm/v6/beacon-chain/state"
"github.com/OffchainLabs/prysm/v6/beacon-chain/state/stategen/mock"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks" "github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
@@ -528,9 +529,13 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
Block: b, Block: b,
Slot: &currentSlot, Slot: &currentSlot,
} }
headRoot, err := b.Block().HashTreeRoot()
require.NoError(t, err)
stn := mockChain.NewEventFeedWrapper() stn := mockChain.NewEventFeedWrapper()
opn := mockChain.NewEventFeedWrapper() opn := mockChain.NewEventFeedWrapper()
stategen := mock.NewService()
stategen.AddStateForRoot(st, headRoot)
s := &Server{ s := &Server{
StateNotifier: &mockChain.SimpleNotifier{Feed: stn}, StateNotifier: &mockChain.SimpleNotifier{Feed: stn},
OperationNotifier: &mockChain.SimpleNotifier{Feed: opn}, OperationNotifier: &mockChain.SimpleNotifier{Feed: opn},
@@ -538,6 +543,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
ChainInfoFetcher: mockChainService, ChainInfoFetcher: mockChainService,
TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), TrackedValidatorsCache: cache.NewTrackedValidatorsCache(),
EventWriteTimeout: testEventWriteTimeout, EventWriteTimeout: testEventWriteTimeout,
StateGen: stategen,
} }
if tc.SetTrackedValidatorsCache != nil { if tc.SetTrackedValidatorsCache != nil {
tc.SetTrackedValidatorsCache(s.TrackedValidatorsCache) tc.SetTrackedValidatorsCache(s.TrackedValidatorsCache)
@@ -553,11 +559,9 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
ProposerIndex: 0, ProposerIndex: 0,
ProposalSlot: 0, ProposalSlot: 0,
ParentBlockNumber: 0, ParentBlockNumber: 0,
ParentBlockRoot: make([]byte, 32),
ParentBlockHash: make([]byte, 32), ParentBlockHash: make([]byte, 32),
HeadState: st,
HeadBlock: b, HeadBlock: b,
HeadRoot: [fieldparams.RootLength]byte{}, HeadRoot: headRoot,
}, },
}, },
} }
@@ -575,8 +579,6 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
func TestFillEventData(t *testing.T) { func TestFillEventData(t *testing.T) {
ctx := context.Background() ctx := context.Background()
t.Run("AlreadyFilledData_ShouldShortCircuitWithoutError", func(t *testing.T) { t.Run("AlreadyFilledData_ShouldShortCircuitWithoutError", func(t *testing.T) {
st, err := util.NewBeaconStateBellatrix()
require.NoError(t, err)
b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlockBellatrix(&eth.SignedBeaconBlockBellatrix{})) b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlockBellatrix(&eth.SignedBeaconBlockBellatrix{}))
require.NoError(t, err) require.NoError(t, err)
attributor, err := payloadattribute.New(&enginev1.PayloadAttributes{ attributor, err := payloadattribute.New(&enginev1.PayloadAttributes{
@@ -584,11 +586,9 @@ func TestFillEventData(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
alreadyFilled := payloadattribute.EventData{ alreadyFilled := payloadattribute.EventData{
HeadState: st,
HeadBlock: b, HeadBlock: b,
HeadRoot: [32]byte{1, 2, 3}, HeadRoot: [32]byte{1, 2, 3},
Attributer: attributor, Attributer: attributor,
ParentBlockRoot: []byte{1, 2, 3},
ParentBlockHash: []byte{4, 5, 6}, ParentBlockHash: []byte{4, 5, 6},
} }
srv := &Server{} // No real HeadFetcher needed here since it won't be called. srv := &Server{} // No real HeadFetcher needed here since it won't be called.
@@ -612,12 +612,14 @@ func TestFillEventData(t *testing.T) {
Timestamp: uint64(time.Now().Unix()), Timestamp: uint64(time.Now().Unix()),
}) })
require.NoError(t, err) require.NoError(t, err)
headRoot, err := b.Block().HashTreeRoot()
require.NoError(t, err)
// Create an event data object missing certain fields: // Create an event data object missing certain fields:
partial := payloadattribute.EventData{ partial := payloadattribute.EventData{
// The presence of a nil HeadState, nil HeadBlock, zeroed HeadRoot, etc.
// will cause fillEventData to try to fill the values.
ProposalSlot: 42, // different epoch from current slot ProposalSlot: 42, // different epoch from current slot
Attributer: attributor, // Must be Bellatrix or later Attributer: attributor, // Must be Bellatrix or later
HeadBlock: b,
HeadRoot: headRoot,
} }
currentSlot := primitives.Slot(0) currentSlot := primitives.Slot(0)
// to avoid slot processing // to avoid slot processing
@@ -629,6 +631,8 @@ func TestFillEventData(t *testing.T) {
Slot: &currentSlot, Slot: &currentSlot,
} }
stategen := mock.NewService()
stategen.AddStateForRoot(st, headRoot)
stn := mockChain.NewEventFeedWrapper() stn := mockChain.NewEventFeedWrapper()
opn := mockChain.NewEventFeedWrapper() opn := mockChain.NewEventFeedWrapper()
srv := &Server{ srv := &Server{
@@ -638,16 +642,15 @@ func TestFillEventData(t *testing.T) {
ChainInfoFetcher: mockChainService, ChainInfoFetcher: mockChainService,
TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), TrackedValidatorsCache: cache.NewTrackedValidatorsCache(),
EventWriteTimeout: testEventWriteTimeout, EventWriteTimeout: testEventWriteTimeout,
StateGen: stategen,
} }
filled, err := srv.fillEventData(ctx, partial) filled, err := srv.fillEventData(ctx, partial)
require.NoError(t, err, "expected successful fill of partial event data") require.NoError(t, err, "expected successful fill of partial event data")
// Verify that fields have been updated from the mock data: // Verify that fields have been updated from the mock data:
require.NotNil(t, filled.HeadState, "HeadState should be assigned")
require.NotNil(t, filled.HeadBlock, "HeadBlock should be assigned") require.NotNil(t, filled.HeadBlock, "HeadBlock should be assigned")
require.NotEqual(t, [32]byte{}, filled.HeadRoot, "HeadRoot should no longer be zero") require.NotEqual(t, [32]byte{}, filled.HeadRoot, "HeadRoot should no longer be zero")
require.NotEmpty(t, filled.ParentBlockRoot, "ParentBlockRoot should be filled")
require.NotEmpty(t, filled.ParentBlockHash, "ParentBlockHash should be filled") require.NotEmpty(t, filled.ParentBlockHash, "ParentBlockHash should be filled")
require.Equal(t, uint64(0), filled.ParentBlockNumber, "ParentBlockNumber must match mock block") require.Equal(t, uint64(0), filled.ParentBlockNumber, "ParentBlockNumber must match mock block")

View File

@@ -10,6 +10,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache" "github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
opfeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation" opfeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation"
statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state" statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state"
"github.com/OffchainLabs/prysm/v6/beacon-chain/state/stategen"
) )
// Server defines a server implementation of the http events service, // Server defines a server implementation of the http events service,
@@ -23,4 +24,5 @@ type Server struct {
KeepAliveInterval time.Duration KeepAliveInterval time.Duration
EventFeedDepth int EventFeedDepth int
EventWriteTimeout time.Duration EventWriteTimeout time.Duration
StateGen stategen.StateManager
} }

View File

@@ -23,8 +23,8 @@ func NewService() *StateManager {
} }
// StateByRootIfCachedNoCopy -- // StateByRootIfCachedNoCopy --
func (_ *StateManager) StateByRootIfCachedNoCopy(_ [32]byte) state.BeaconState { func (m *StateManager) StateByRootIfCachedNoCopy(root [32]byte) state.BeaconState {
panic("implement me") return m.StatesByRoot[root]
} }
// Resume -- // Resume --

View File

@@ -0,0 +1,2 @@
### Fixed
- Ensure that the `payload_attributes` event has a consistent view of the head state by passing the head block in the event and using stategen to retrieve the corresponding state.

View File

@@ -10,7 +10,6 @@ go_library(
importpath = "github.com/OffchainLabs/prysm/v6/consensus-types/payload-attribute", importpath = "github.com/OffchainLabs/prysm/v6/consensus-types/payload-attribute",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//beacon-chain/state:go_default_library",
"//config/fieldparams:go_default_library", "//config/fieldparams:go_default_library",
"//consensus-types:go_default_library", "//consensus-types:go_default_library",
"//consensus-types/blocks:go_default_library", "//consensus-types/blocks:go_default_library",

View File

@@ -1,7 +1,6 @@
package payloadattribute package payloadattribute
import ( import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/state"
field_params "github.com/OffchainLabs/prysm/v6/config/fieldparams" field_params "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks" "github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
@@ -100,10 +99,8 @@ type EventData struct {
ProposerIndex primitives.ValidatorIndex ProposerIndex primitives.ValidatorIndex
ProposalSlot primitives.Slot ProposalSlot primitives.Slot
ParentBlockNumber uint64 ParentBlockNumber uint64
ParentBlockRoot []byte
ParentBlockHash []byte ParentBlockHash []byte
Attributer Attributer Attributer Attributer
HeadState state.BeaconState
HeadBlock interfaces.ReadOnlySignedBeaconBlock HeadBlock interfaces.ReadOnlySignedBeaconBlock
HeadRoot [field_params.RootLength]byte HeadRoot [field_params.RootLength]byte
} }