From 4374e709cb9a2ef785d9c1daff9135bfc766fa69 Mon Sep 17 00:00:00 2001 From: james-prysm <90280386+james-prysm@users.noreply.github.com> Date: Fri, 12 Dec 2025 15:18:22 -0800 Subject: [PATCH] fixing state replay caused by REST api duties attester and sync committee endpoints (#16136) **What type of PR is this?** Bug fix **What does this PR do? Why is it needed?** s.Stater.StateBySlot may replay if it's the current epoch as it's for values in the db, if we are in the current we should try to get head slot and use the cache, proposer duties was doing this already but the other 2 duties endpoints was not. this pr aligns all 3 and introduces a new `statebyepoch` that just wraps the approach. I tested by running this kurtosis config with and without the fix to see that the replays stop, the blockchain progresses, and the upgraded to fulu is not printed multiple times ``` participants: # Super-nodes - el_type: nethermind cl_type: prysm cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest count: 2 supernode: true cl_extra_params: - --subscribe-all-subnets - --verbosity=debug vc_extra_params: - --enable-beacon-rest-api - --verbosity=debug # Full-nodes - el_type: nethermind cl_type: prysm cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest validator_count: 63 cl_extra_params: - --verbosity=debug vc_extra_params: - --enable-beacon-rest-api - --verbosity=debug - el_type: nethermind cl_type: prysm cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest cl_extra_params: - --verbosity=debug vc_extra_params: - --enable-beacon-rest-api - --verbosity=debug validator_count: 13 additional_services: - dora - spamoor spamoor_params: image: ethpandaops/spamoor:master max_mem: 4000 spammers: - scenario: eoatx config: throughput: 200 - scenario: blobs config: throughput: 20 network_params: fulu_fork_epoch: 2 bpo_1_epoch: 8 bpo_1_max_blobs: 21 withdrawal_type: "0x02" preset: mainnet seconds_per_slot: 6 global_log_level: debug ``` **Which issues(s) does this PR fix?** Fixes # https://github.com/OffchainLabs/prysm/issues/16135 **Other notes for review** **Acknowledgements** - [x] I have read [CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md). - [x] I have included a uniquely named [changelog fragment file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd). - [x] I have added a description with sufficient context for reviewers to understand this PR. - [x] I have tested that my changes work as expected and I added a testing plan to the PR description (if applicable). --- .../rpc/eth/beacon/handlers_state_test.go | 4 + beacon-chain/rpc/eth/validator/BUILD.bazel | 2 +- beacon-chain/rpc/eth/validator/handlers.go | 56 +----- .../rpc/eth/validator/handlers_test.go | 178 +++++++++++++++++- beacon-chain/rpc/lookup/BUILD.bazel | 1 + beacon-chain/rpc/lookup/stater.go | 42 +++++ beacon-chain/rpc/lookup/stater_test.go | 108 +++++++++++ beacon-chain/rpc/testutil/BUILD.bazel | 1 + beacon-chain/rpc/testutil/mock_stater.go | 21 +++ .../james-prysm_fix-rest-replay-state.md | 3 + 10 files changed, 363 insertions(+), 53 deletions(-) create mode 100644 changelog/james-prysm_fix-rest-replay-state.md diff --git a/beacon-chain/rpc/eth/beacon/handlers_state_test.go b/beacon-chain/rpc/eth/beacon/handlers_state_test.go index 190a4d6c11..8ee4263927 100644 --- a/beacon-chain/rpc/eth/beacon/handlers_state_test.go +++ b/beacon-chain/rpc/eth/beacon/handlers_state_test.go @@ -654,6 +654,10 @@ func (m *futureSyncMockFetcher) StateBySlot(context.Context, primitives.Slot) (s return m.BeaconState, nil } +func (m *futureSyncMockFetcher) StateByEpoch(context.Context, primitives.Epoch) (state.BeaconState, error) { + return m.BeaconState, nil +} + func TestGetSyncCommittees_Future(t *testing.T) { st, _ := util.DeterministicGenesisStateAltair(t, params.BeaconConfig().SyncCommitteeSize) syncCommittee := make([][]byte, params.BeaconConfig().SyncCommitteeSize) diff --git a/beacon-chain/rpc/eth/validator/BUILD.bazel b/beacon-chain/rpc/eth/validator/BUILD.bazel index 31ea92adb5..962d5c05d2 100644 --- a/beacon-chain/rpc/eth/validator/BUILD.bazel +++ b/beacon-chain/rpc/eth/validator/BUILD.bazel @@ -19,7 +19,6 @@ go_library( "//beacon-chain/cache:go_default_library", "//beacon-chain/core/feed/operation:go_default_library", "//beacon-chain/core/helpers:go_default_library", - "//beacon-chain/core/transition:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/operations/attestations:go_default_library", "//beacon-chain/operations/synccommittee:go_default_library", @@ -78,6 +77,7 @@ go_test( "//beacon-chain/rpc/core:go_default_library", "//beacon-chain/rpc/eth/rewards/testing:go_default_library", "//beacon-chain/rpc/eth/shared/testing:go_default_library", + "//beacon-chain/rpc/lookup:go_default_library", "//beacon-chain/rpc/testutil:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/state/stategen:go_default_library", diff --git a/beacon-chain/rpc/eth/validator/handlers.go b/beacon-chain/rpc/eth/validator/handlers.go index e2ea1b523e..bf2d6851c6 100644 --- a/beacon-chain/rpc/eth/validator/handlers.go +++ b/beacon-chain/rpc/eth/validator/handlers.go @@ -19,7 +19,6 @@ import ( "github.com/OffchainLabs/prysm/v7/beacon-chain/builder" "github.com/OffchainLabs/prysm/v7/beacon-chain/cache" "github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers" - "github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition" "github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core" rpchelpers "github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/eth/helpers" "github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/eth/shared" @@ -898,18 +897,13 @@ func (s *Server) GetAttesterDuties(w http.ResponseWriter, r *http.Request) { return } - var startSlot primitives.Slot + // For next epoch requests, we use the current epoch's state since committee + // assignments for next epoch can be computed from current epoch's state. + epochForState := requestedEpoch if requestedEpoch == nextEpoch { - startSlot, err = slots.EpochStart(currentEpoch) - } else { - startSlot, err = slots.EpochStart(requestedEpoch) + epochForState = currentEpoch } - if err != nil { - httputil.HandleError(w, fmt.Sprintf("Could not get start slot from epoch %d: %v", requestedEpoch, err), http.StatusInternalServerError) - return - } - - st, err := s.Stater.StateBySlot(ctx, startSlot) + st, err := s.Stater.StateByEpoch(ctx, epochForState) if err != nil { shared.WriteStateFetchError(w, err) return @@ -1020,39 +1014,11 @@ func (s *Server) GetProposerDuties(w http.ResponseWriter, r *http.Request) { nextEpochLookahead = true } - epochStartSlot, err := slots.EpochStart(requestedEpoch) + st, err := s.Stater.StateByEpoch(ctx, requestedEpoch) if err != nil { - httputil.HandleError(w, fmt.Sprintf("Could not get start slot of epoch %d: %v", requestedEpoch, err), http.StatusInternalServerError) + shared.WriteStateFetchError(w, err) return } - var st state.BeaconState - // if the requested epoch is new, use the head state and the next slot cache - if requestedEpoch < currentEpoch { - st, err = s.Stater.StateBySlot(ctx, epochStartSlot) - if err != nil { - shared.WriteStateFetchError(w, err) - return - } - } else { - st, err = s.HeadFetcher.HeadState(ctx) - if err != nil { - httputil.HandleError(w, fmt.Sprintf("Could not get head state: %v ", err), http.StatusInternalServerError) - return - } - // Notice that even for Fulu requests for the next epoch, we are only advancing the state to the start of the current epoch. - if st.Slot() < epochStartSlot { - headRoot, err := s.HeadFetcher.HeadRoot(ctx) - if err != nil { - httputil.HandleError(w, fmt.Sprintf("Could not get head root: %v ", err), http.StatusInternalServerError) - return - } - st, err = transition.ProcessSlotsUsingNextSlotCache(ctx, st, headRoot, epochStartSlot) - if err != nil { - httputil.HandleError(w, fmt.Sprintf("Could not process slots up to %d: %v ", epochStartSlot, err), http.StatusInternalServerError) - return - } - } - } var assignments map[primitives.ValidatorIndex][]primitives.Slot if nextEpochLookahead { @@ -1175,12 +1141,8 @@ func (s *Server) GetSyncCommitteeDuties(w http.ResponseWriter, r *http.Request) } startingEpoch := min(requestedEpoch, currentEpoch) - slot, err := slots.EpochStart(startingEpoch) - if err != nil { - httputil.HandleError(w, "Could not get sync committee slot: "+err.Error(), http.StatusInternalServerError) - return - } - st, err := s.Stater.State(ctx, []byte(strconv.FormatUint(uint64(slot), 10))) + + st, err := s.Stater.StateByEpoch(ctx, startingEpoch) if err != nil { shared.WriteStateFetchError(w, err) return diff --git a/beacon-chain/rpc/eth/validator/handlers_test.go b/beacon-chain/rpc/eth/validator/handlers_test.go index a0e3def4fa..fb186c42ae 100644 --- a/beacon-chain/rpc/eth/validator/handlers_test.go +++ b/beacon-chain/rpc/eth/validator/handlers_test.go @@ -25,6 +25,7 @@ import ( "github.com/OffchainLabs/prysm/v7/beacon-chain/operations/synccommittee" p2pmock "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing" "github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core" + "github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/lookup" "github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/testutil" "github.com/OffchainLabs/prysm/v7/beacon-chain/state" "github.com/OffchainLabs/prysm/v7/beacon-chain/state/stategen" @@ -2006,6 +2007,7 @@ func TestGetAttesterDuties(t *testing.T) { TimeFetcher: chain, SyncChecker: &mockSync.Sync{IsSyncing: false}, OptimisticModeFetcher: chain, + HeadFetcher: chain, BeaconDB: db, } @@ -2184,6 +2186,7 @@ func TestGetAttesterDuties(t *testing.T) { Stater: &testutil.MockStater{StatesBySlot: map[primitives.Slot]state.BeaconState{0: bs}}, TimeFetcher: chain, OptimisticModeFetcher: chain, + HeadFetcher: chain, SyncChecker: &mockSync.Sync{IsSyncing: false}, BeaconDB: db, } @@ -2224,6 +2227,62 @@ func TestGetAttesterDuties(t *testing.T) { require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) assert.Equal(t, http.StatusServiceUnavailable, e.Code) }) + t.Run("state not found returns 404", func(t *testing.T) { + chainSlot := primitives.Slot(0) + chain := &mockChain.ChainService{ + State: bs, Root: genesisRoot[:], Slot: &chainSlot, + } + stateNotFoundErr := lookup.NewStateNotFoundError(8192, []byte("test")) + s := &Server{ + Stater: &testutil.MockStater{CustomError: &stateNotFoundErr}, + TimeFetcher: chain, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + OptimisticModeFetcher: chain, + HeadFetcher: chain, + } + + var body bytes.Buffer + _, err = body.WriteString("[\"0\"]") + require.NoError(t, err) + request := httptest.NewRequest(http.MethodGet, "http://www.example.com/eth/v1/validator/duties/attester/{epoch}", &body) + request.SetPathValue("epoch", "0") + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.GetAttesterDuties(writer, request) + assert.Equal(t, http.StatusNotFound, writer.Code) + e := &httputil.DefaultJsonError{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusNotFound, e.Code) + assert.StringContains(t, "State not found", e.Message) + }) + t.Run("state fetch error returns 500", func(t *testing.T) { + chainSlot := primitives.Slot(0) + chain := &mockChain.ChainService{ + State: bs, Root: genesisRoot[:], Slot: &chainSlot, + } + s := &Server{ + Stater: &testutil.MockStater{CustomError: errors.New("internal error")}, + TimeFetcher: chain, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + OptimisticModeFetcher: chain, + HeadFetcher: chain, + } + + var body bytes.Buffer + _, err = body.WriteString("[\"0\"]") + require.NoError(t, err) + request := httptest.NewRequest(http.MethodGet, "http://www.example.com/eth/v1/validator/duties/attester/{epoch}", &body) + request.SetPathValue("epoch", "0") + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.GetAttesterDuties(writer, request) + assert.Equal(t, http.StatusInternalServerError, writer.Code) + e := &httputil.DefaultJsonError{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusInternalServerError, e.Code) + }) } func TestGetProposerDuties(t *testing.T) { @@ -2427,6 +2486,60 @@ func TestGetProposerDuties(t *testing.T) { require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) assert.Equal(t, http.StatusServiceUnavailable, e.Code) }) + t.Run("state not found returns 404", func(t *testing.T) { + bs, err := transition.GenesisBeaconState(t.Context(), deposits, 0, eth1Data) + require.NoError(t, err) + chainSlot := primitives.Slot(0) + chain := &mockChain.ChainService{ + State: bs, Root: genesisRoot[:], Slot: &chainSlot, + } + stateNotFoundErr := lookup.NewStateNotFoundError(8192, []byte("test")) + s := &Server{ + Stater: &testutil.MockStater{CustomError: &stateNotFoundErr}, + TimeFetcher: chain, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + OptimisticModeFetcher: chain, + HeadFetcher: chain, + } + + request := httptest.NewRequest(http.MethodGet, "http://www.example.com/eth/v1/validator/duties/proposer/{epoch}", nil) + request.SetPathValue("epoch", "0") + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.GetProposerDuties(writer, request) + assert.Equal(t, http.StatusNotFound, writer.Code) + e := &httputil.DefaultJsonError{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusNotFound, e.Code) + assert.StringContains(t, "State not found", e.Message) + }) + t.Run("state fetch error returns 500", func(t *testing.T) { + bs, err := transition.GenesisBeaconState(t.Context(), deposits, 0, eth1Data) + require.NoError(t, err) + chainSlot := primitives.Slot(0) + chain := &mockChain.ChainService{ + State: bs, Root: genesisRoot[:], Slot: &chainSlot, + } + s := &Server{ + Stater: &testutil.MockStater{CustomError: errors.New("internal error")}, + TimeFetcher: chain, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + OptimisticModeFetcher: chain, + HeadFetcher: chain, + } + + request := httptest.NewRequest(http.MethodGet, "http://www.example.com/eth/v1/validator/duties/proposer/{epoch}", nil) + request.SetPathValue("epoch", "0") + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.GetProposerDuties(writer, request) + assert.Equal(t, http.StatusInternalServerError, writer.Code) + e := &httputil.DefaultJsonError{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusInternalServerError, e.Code) + }) } func TestGetSyncCommitteeDuties(t *testing.T) { @@ -2457,7 +2570,7 @@ func TestGetSyncCommitteeDuties(t *testing.T) { } require.NoError(t, st.SetNextSyncCommittee(nextCommittee)) - mockChainService := &mockChain.ChainService{Genesis: genesisTime} + mockChainService := &mockChain.ChainService{Genesis: genesisTime, State: st} s := &Server{ Stater: &testutil.MockStater{BeaconState: st}, SyncChecker: &mockSync.Sync{IsSyncing: false}, @@ -2648,7 +2761,7 @@ func TestGetSyncCommitteeDuties(t *testing.T) { return newSyncPeriodSt } } - mockChainService := &mockChain.ChainService{Genesis: genesisTime, Slot: &newSyncPeriodStartSlot} + mockChainService := &mockChain.ChainService{Genesis: genesisTime, Slot: &newSyncPeriodStartSlot, State: newSyncPeriodSt} s := &Server{ Stater: &testutil.MockStater{BeaconState: stateFetchFn(newSyncPeriodStartSlot)}, SyncChecker: &mockSync.Sync{IsSyncing: false}, @@ -2729,8 +2842,7 @@ func TestGetSyncCommitteeDuties(t *testing.T) { slot, err := slots.EpochStart(1) require.NoError(t, err) - st2, err := util.NewBeaconStateBellatrix() - require.NoError(t, err) + st2 := st.Copy() require.NoError(t, st2.SetSlot(slot)) mockChainService := &mockChain.ChainService{ @@ -2744,7 +2856,7 @@ func TestGetSyncCommitteeDuties(t *testing.T) { State: st2, } s := &Server{ - Stater: &testutil.MockStater{BeaconState: st}, + Stater: &testutil.MockStater{BeaconState: st2}, SyncChecker: &mockSync.Sync{IsSyncing: false}, TimeFetcher: mockChainService, HeadFetcher: mockChainService, @@ -2789,6 +2901,62 @@ func TestGetSyncCommitteeDuties(t *testing.T) { require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) assert.Equal(t, http.StatusServiceUnavailable, e.Code) }) + t.Run("state not found returns 404", func(t *testing.T) { + slot := 2 * params.BeaconConfig().SlotsPerEpoch + chainService := &mockChain.ChainService{ + Slot: &slot, + } + stateNotFoundErr := lookup.NewStateNotFoundError(8192, []byte("test")) + s := &Server{ + Stater: &testutil.MockStater{CustomError: &stateNotFoundErr}, + TimeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + OptimisticModeFetcher: chainService, + HeadFetcher: chainService, + } + + var body bytes.Buffer + _, err := body.WriteString("[\"1\"]") + require.NoError(t, err) + request := httptest.NewRequest(http.MethodGet, "http://www.example.com/eth/v1/validator/duties/sync/{epoch}", &body) + request.SetPathValue("epoch", "1") + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.GetSyncCommitteeDuties(writer, request) + assert.Equal(t, http.StatusNotFound, writer.Code) + e := &httputil.DefaultJsonError{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusNotFound, e.Code) + assert.StringContains(t, "State not found", e.Message) + }) + t.Run("state fetch error returns 500", func(t *testing.T) { + slot := 2 * params.BeaconConfig().SlotsPerEpoch + chainService := &mockChain.ChainService{ + Slot: &slot, + } + s := &Server{ + Stater: &testutil.MockStater{CustomError: errors.New("internal error")}, + TimeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + OptimisticModeFetcher: chainService, + HeadFetcher: chainService, + } + + var body bytes.Buffer + _, err := body.WriteString("[\"1\"]") + require.NoError(t, err) + request := httptest.NewRequest(http.MethodGet, "http://www.example.com/eth/v1/validator/duties/sync/{epoch}", &body) + request.SetPathValue("epoch", "1") + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.GetSyncCommitteeDuties(writer, request) + assert.Equal(t, http.StatusInternalServerError, writer.Code) + e := &httputil.DefaultJsonError{} + require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) + assert.Equal(t, http.StatusInternalServerError, e.Code) + }) } func TestPrepareBeaconProposer(t *testing.T) { diff --git a/beacon-chain/rpc/lookup/BUILD.bazel b/beacon-chain/rpc/lookup/BUILD.bazel index 42e82a0599..d5f7bdefe3 100644 --- a/beacon-chain/rpc/lookup/BUILD.bazel +++ b/beacon-chain/rpc/lookup/BUILD.bazel @@ -11,6 +11,7 @@ go_library( deps = [ "//beacon-chain/blockchain:go_default_library", "//beacon-chain/core/peerdas:go_default_library", + "//beacon-chain/core/transition:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/rpc/core:go_default_library", diff --git a/beacon-chain/rpc/lookup/stater.go b/beacon-chain/rpc/lookup/stater.go index a3bd0f250f..fc90c4d404 100644 --- a/beacon-chain/rpc/lookup/stater.go +++ b/beacon-chain/rpc/lookup/stater.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain" + "github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition" "github.com/OffchainLabs/prysm/v7/beacon-chain/db" "github.com/OffchainLabs/prysm/v7/beacon-chain/state" "github.com/OffchainLabs/prysm/v7/beacon-chain/state/stategen" @@ -98,6 +99,7 @@ type Stater interface { State(ctx context.Context, id []byte) (state.BeaconState, error) StateRoot(ctx context.Context, id []byte) ([]byte, error) StateBySlot(ctx context.Context, slot primitives.Slot) (state.BeaconState, error) + StateByEpoch(ctx context.Context, epoch primitives.Epoch) (state.BeaconState, error) } // BeaconDbStater is an implementation of Stater. It retrieves states from the beacon chain database. @@ -267,6 +269,46 @@ func (p *BeaconDbStater) StateBySlot(ctx context.Context, target primitives.Slot return st, nil } +// StateByEpoch returns the state for the start of the requested epoch. +// For current or next epoch, it uses the head state and next slot cache for efficiency. +// For past epochs, it replays blocks from the most recent canonical state. +func (p *BeaconDbStater) StateByEpoch(ctx context.Context, epoch primitives.Epoch) (state.BeaconState, error) { + ctx, span := trace.StartSpan(ctx, "statefetcher.StateByEpoch") + defer span.End() + + targetSlot, err := slots.EpochStart(epoch) + if err != nil { + return nil, errors.Wrap(err, "could not get epoch start slot") + } + + currentSlot := p.GenesisTimeFetcher.CurrentSlot() + currentEpoch := slots.ToEpoch(currentSlot) + + // For past epochs, use the replay mechanism + if epoch < currentEpoch { + return p.StateBySlot(ctx, targetSlot) + } + + // For current or next epoch, use head state + next slot cache (much faster) + headState, err := p.ChainInfoFetcher.HeadState(ctx) + if err != nil { + return nil, errors.Wrap(err, "could not get head state") + } + + // If head state is already at or past the target slot, return it + if headState.Slot() >= targetSlot { + return headState, nil + } + + // Process slots using the next slot cache + headRoot := p.ChainInfoFetcher.CachedHeadRoot() + st, err := transition.ProcessSlotsUsingNextSlotCache(ctx, headState, headRoot[:], targetSlot) + if err != nil { + return nil, errors.Wrapf(err, "could not process slots up to %d", targetSlot) + } + return st, nil +} + func (p *BeaconDbStater) headStateRoot(ctx context.Context) ([]byte, error) { b, err := p.ChainInfoFetcher.HeadBlock(ctx) if err != nil { diff --git a/beacon-chain/rpc/lookup/stater_test.go b/beacon-chain/rpc/lookup/stater_test.go index eaeafbb7f2..a2ffa9140b 100644 --- a/beacon-chain/rpc/lookup/stater_test.go +++ b/beacon-chain/rpc/lookup/stater_test.go @@ -444,3 +444,111 @@ func TestStateBySlot_AfterHeadSlot(t *testing.T) { require.NoError(t, err) assert.Equal(t, primitives.Slot(101), st.Slot()) } + +func TestStateByEpoch(t *testing.T) { + ctx := t.Context() + slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch + + t.Run("current epoch uses head state", func(t *testing.T) { + // Head is at slot 5 (epoch 0), requesting epoch 0 + headSlot := primitives.Slot(5) + headSt, err := statenative.InitializeFromProtoPhase0(ðpb.BeaconState{Slot: headSlot}) + require.NoError(t, err) + + currentSlot := headSlot + mock := &chainMock.ChainService{State: headSt, Slot: ¤tSlot} + p := BeaconDbStater{ChainInfoFetcher: mock, GenesisTimeFetcher: mock} + + st, err := p.StateByEpoch(ctx, 0) + require.NoError(t, err) + // Should return head state since it's already past epoch start + assert.Equal(t, headSlot, st.Slot()) + }) + + t.Run("current epoch processes slots to epoch start", func(t *testing.T) { + // Head is at slot 5 (epoch 0), requesting epoch 1 + // Current slot is 32 (epoch 1), so epoch 1 is current epoch + headSlot := primitives.Slot(5) + headSt, err := statenative.InitializeFromProtoPhase0(ðpb.BeaconState{Slot: headSlot}) + require.NoError(t, err) + + currentSlot := slotsPerEpoch // slot 32, epoch 1 + mock := &chainMock.ChainService{State: headSt, Slot: ¤tSlot} + p := BeaconDbStater{ChainInfoFetcher: mock, GenesisTimeFetcher: mock} + + // Note: This will fail since ProcessSlotsUsingNextSlotCache requires proper setup + // In real usage, the transition package handles this properly + _, err = p.StateByEpoch(ctx, 1) + // The error is expected since we don't have a fully initialized beacon state + // that can process slots (missing committees, etc.) + assert.NotNil(t, err) + }) + + t.Run("past epoch uses replay", func(t *testing.T) { + // Head is at epoch 2, requesting epoch 0 (past) + headSlot := slotsPerEpoch * 2 // slot 64, epoch 2 + headSt, err := statenative.InitializeFromProtoPhase0(ðpb.BeaconState{Slot: headSlot}) + require.NoError(t, err) + + pastEpochSt, err := statenative.InitializeFromProtoPhase0(ðpb.BeaconState{Slot: 0}) + require.NoError(t, err) + + currentSlot := headSlot + mock := &chainMock.ChainService{State: headSt, Slot: ¤tSlot} + mockReplayer := mockstategen.NewReplayerBuilder() + mockReplayer.SetMockStateForSlot(pastEpochSt, 0) + p := BeaconDbStater{ChainInfoFetcher: mock, GenesisTimeFetcher: mock, ReplayerBuilder: mockReplayer} + + st, err := p.StateByEpoch(ctx, 0) + require.NoError(t, err) + assert.Equal(t, primitives.Slot(0), st.Slot()) + }) + + t.Run("next epoch uses head state path", func(t *testing.T) { + // Head is at slot 30 (epoch 0), requesting epoch 1 (next) + // Current slot is 30 (epoch 0), so epoch 1 is next epoch + headSlot := primitives.Slot(30) + headSt, err := statenative.InitializeFromProtoPhase0(ðpb.BeaconState{Slot: headSlot}) + require.NoError(t, err) + + currentSlot := headSlot + mock := &chainMock.ChainService{State: headSt, Slot: ¤tSlot} + p := BeaconDbStater{ChainInfoFetcher: mock, GenesisTimeFetcher: mock} + + // Note: This will fail since ProcessSlotsUsingNextSlotCache requires proper setup + _, err = p.StateByEpoch(ctx, 1) + // The error is expected since we don't have a fully initialized beacon state + assert.NotNil(t, err) + }) + + t.Run("head state already at target slot returns immediately", func(t *testing.T) { + // Head is at slot 32 (epoch 1 start), requesting epoch 1 + headSlot := slotsPerEpoch // slot 32 + headSt, err := statenative.InitializeFromProtoPhase0(ðpb.BeaconState{Slot: headSlot}) + require.NoError(t, err) + + currentSlot := headSlot + mock := &chainMock.ChainService{State: headSt, Slot: ¤tSlot} + p := BeaconDbStater{ChainInfoFetcher: mock, GenesisTimeFetcher: mock} + + st, err := p.StateByEpoch(ctx, 1) + require.NoError(t, err) + assert.Equal(t, headSlot, st.Slot()) + }) + + t.Run("head state past target slot returns head state", func(t *testing.T) { + // Head is at slot 40, requesting epoch 1 (starts at slot 32) + headSlot := primitives.Slot(40) + headSt, err := statenative.InitializeFromProtoPhase0(ðpb.BeaconState{Slot: headSlot}) + require.NoError(t, err) + + currentSlot := headSlot + mock := &chainMock.ChainService{State: headSt, Slot: ¤tSlot} + p := BeaconDbStater{ChainInfoFetcher: mock, GenesisTimeFetcher: mock} + + st, err := p.StateByEpoch(ctx, 1) + require.NoError(t, err) + // Returns head state since it's already >= epoch start + assert.Equal(t, headSlot, st.Slot()) + }) +} diff --git a/beacon-chain/rpc/testutil/BUILD.bazel b/beacon-chain/rpc/testutil/BUILD.bazel index e0b883597e..700a4e1a4a 100644 --- a/beacon-chain/rpc/testutil/BUILD.bazel +++ b/beacon-chain/rpc/testutil/BUILD.bazel @@ -26,5 +26,6 @@ go_library( "//proto/prysm/v1alpha1:go_default_library", "//testing/require:go_default_library", "//testing/util:go_default_library", + "//time/slots:go_default_library", ], ) diff --git a/beacon-chain/rpc/testutil/mock_stater.go b/beacon-chain/rpc/testutil/mock_stater.go index 15b0baea95..5e7e08087a 100644 --- a/beacon-chain/rpc/testutil/mock_stater.go +++ b/beacon-chain/rpc/testutil/mock_stater.go @@ -6,6 +6,7 @@ import ( "github.com/OffchainLabs/prysm/v7/beacon-chain/state" "github.com/OffchainLabs/prysm/v7/consensus-types/primitives" "github.com/OffchainLabs/prysm/v7/encoding/bytesutil" + "github.com/OffchainLabs/prysm/v7/time/slots" ) // MockStater is a fake implementation of lookup.Stater. @@ -14,6 +15,7 @@ type MockStater struct { StateProviderFunc func(ctx context.Context, stateId []byte) (state.BeaconState, error) BeaconStateRoot []byte StatesBySlot map[primitives.Slot]state.BeaconState + StatesByEpoch map[primitives.Epoch]state.BeaconState StatesByRoot map[[32]byte]state.BeaconState CustomError error } @@ -43,3 +45,22 @@ func (m *MockStater) StateRoot(context.Context, []byte) ([]byte, error) { func (m *MockStater) StateBySlot(_ context.Context, s primitives.Slot) (state.BeaconState, error) { return m.StatesBySlot[s], nil } + +// StateByEpoch -- +func (m *MockStater) StateByEpoch(_ context.Context, e primitives.Epoch) (state.BeaconState, error) { + if m.CustomError != nil { + return nil, m.CustomError + } + if m.StatesByEpoch != nil { + return m.StatesByEpoch[e], nil + } + // Fall back to StatesBySlot if StatesByEpoch is not set + slot, err := slots.EpochStart(e) + if err != nil { + return nil, err + } + if m.StatesBySlot != nil { + return m.StatesBySlot[slot], nil + } + return m.BeaconState, nil +} diff --git a/changelog/james-prysm_fix-rest-replay-state.md b/changelog/james-prysm_fix-rest-replay-state.md new file mode 100644 index 0000000000..552477b6a9 --- /dev/null +++ b/changelog/james-prysm_fix-rest-replay-state.md @@ -0,0 +1,3 @@ +### Fixed + +- fixed replay state issue in rest api caused by attester and sync committee duties endpoints \ No newline at end of file