From 66342655fd4f9280bdc1ae59f3bc164f380fc905 Mon Sep 17 00:00:00 2001 From: james-prysm <90280386+james-prysm@users.noreply.github.com> Date: Thu, 18 Dec 2025 07:07:09 -0800 Subject: [PATCH] throw 503 error when submit attestation and sync committee are called on syncing node + align changes to gRPC (#16152) **What type of PR is this?** Bug fix **What does this PR do? Why is it needed?** Prysm starting throwing this error `Could not write response message" error="write tcp 10.104.92.212:5052->10.104.92.196:41876: write: broken pipe` because a validator got attestation data from a synced node and submitted attestation to a syncing node, when the node couldn't replay the state the validator context deadlined and disconnected but the writer when it finally responded tries to write it gets this broken pipe error. applies to `/eth/v2/beacon/pool/attestations` and `/eth/v1/beacon/pool/sync_committees` the solution is 2 part. 1. we shouldn't allow submission of an attestation if the node is syncing because we can't save the attestation without the state information. 2. we were doing the expensive state call before broadcast before in rest and now it should match gRPC where it happens afterward in its own go routine. Tested manually running kurtosis with rest validators ``` participants: # Super-nodes - el_type: nethermind cl_type: prysm cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest count: 2 supernode: true cl_extra_params: - --subscribe-all-subnets - --verbosity=debug vc_extra_params: - --enable-beacon-rest-api - --verbosity=debug # Full-nodes - el_type: nethermind cl_type: prysm cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest validator_count: 63 cl_extra_params: - --verbosity=debug vc_extra_params: - --enable-beacon-rest-api - --verbosity=debug - el_type: nethermind cl_type: prysm cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest cl_extra_params: - --verbosity=debug vc_extra_params: - --enable-beacon-rest-api - --verbosity=debug validator_count: 13 additional_services: - dora - spamoor spamoor_params: image: ethpandaops/spamoor:master max_mem: 4000 spammers: - scenario: eoatx config: throughput: 200 - scenario: blobs config: throughput: 20 network_params: fulu_fork_epoch: 2 bpo_1_epoch: 8 bpo_1_max_blobs: 21 withdrawal_type: "0x02" preset: mainnet seconds_per_slot: 6 global_log_level: debug ``` **Which issues(s) does this PR fix?** Fixes # **Other notes for review** **Acknowledgements** - [x] I have read [CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md). - [x] I have included a uniquely named [changelog fragment file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd). - [x] I have added a description with sufficient context for reviewers to understand this PR. - [x] I have tested that my changes work as expected and I added a testing plan to the PR description (if applicable). --- beacon-chain/rpc/eth/beacon/handlers_pool.go | 56 ++++++---- .../rpc/eth/beacon/handlers_pool_test.go | 101 +++++++++++++++--- .../rpc/prysm/v1alpha1/validator/attester.go | 42 ++++---- .../prysm/v1alpha1/validator/attester_test.go | 33 ++++++ .../james-prysm_align-atter-pool-apis.md | 3 + 5 files changed, 185 insertions(+), 50 deletions(-) create mode 100644 changelog/james-prysm_align-atter-pool-apis.md diff --git a/beacon-chain/rpc/eth/beacon/handlers_pool.go b/beacon-chain/rpc/eth/beacon/handlers_pool.go index b26b298976..8b7be09d05 100644 --- a/beacon-chain/rpc/eth/beacon/handlers_pool.go +++ b/beacon-chain/rpc/eth/beacon/handlers_pool.go @@ -130,6 +130,10 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) { ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitAttestationsV2") defer span.End() + if shared.IsSyncing(ctx, w, s.SyncChecker, s.HeadFetcher, s.TimeFetcher, s.OptimisticModeFetcher) { + return + } + versionHeader := r.Header.Get(api.VersionHeader) if versionHeader == "" { httputil.HandleError(w, api.VersionHeader+" header is required", http.StatusBadRequest) @@ -238,22 +242,14 @@ func (s *Server) handleAttestationsElectra( }, }) - targetState, err := s.AttestationStateFetcher.AttestationTargetState(ctx, singleAtt.Data.Target) - if err != nil { - return nil, nil, errors.Wrap(err, "could not get target state for attestation") - } - committee, err := corehelpers.BeaconCommitteeFromState(ctx, targetState, singleAtt.Data.Slot, singleAtt.CommitteeId) - if err != nil { - return nil, nil, errors.Wrap(err, "could not get committee for attestation") - } - att := singleAtt.ToAttestationElectra(committee) - - wantedEpoch := slots.ToEpoch(att.Data.Slot) + // Broadcast first using CommitteeId directly (fast path) + // This matches gRPC behavior and avoids blocking on state fetching + wantedEpoch := slots.ToEpoch(singleAtt.Data.Slot) vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch) if err != nil { return nil, nil, errors.Wrap(err, "could not get head validator indices") } - subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.GetCommitteeIndex(), att.Data.Slot) + subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), singleAtt.CommitteeId, singleAtt.Data.Slot) if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, singleAtt); err != nil { failedBroadcasts = append(failedBroadcasts, &server.IndexedError{ Index: i, @@ -264,17 +260,35 @@ func (s *Server) handleAttestationsElectra( } continue } + } - if features.Get().EnableExperimentalAttestationPool { - if err = s.AttestationCache.Add(att); err != nil { - log.WithError(err).Error("Could not save attestation") + // Save to pool after broadcast (slow path - requires state fetching) + // Run in goroutine to avoid blocking the HTTP response + go func() { + for _, singleAtt := range validAttestations { + targetState, err := s.AttestationStateFetcher.AttestationTargetState(context.Background(), singleAtt.Data.Target) + if err != nil { + log.WithError(err).Error("Could not get target state for attestation") + continue } - } else { - if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil { - log.WithError(err).Error("Could not save attestation") + committee, err := corehelpers.BeaconCommitteeFromState(context.Background(), targetState, singleAtt.Data.Slot, singleAtt.CommitteeId) + if err != nil { + log.WithError(err).Error("Could not get committee for attestation") + continue + } + att := singleAtt.ToAttestationElectra(committee) + + if features.Get().EnableExperimentalAttestationPool { + if err = s.AttestationCache.Add(att); err != nil { + log.WithError(err).Error("Could not save attestation") + } + } else { + if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil { + log.WithError(err).Error("Could not save attestation") + } } } - } + }() if len(failedBroadcasts) > 0 { log.WithFields(logrus.Fields{ @@ -470,6 +484,10 @@ func (s *Server) SubmitSyncCommitteeSignatures(w http.ResponseWriter, r *http.Re ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitPoolSyncCommitteeSignatures") defer span.End() + if shared.IsSyncing(ctx, w, s.SyncChecker, s.HeadFetcher, s.TimeFetcher, s.OptimisticModeFetcher) { + return + } + var req structs.SubmitSyncCommitteeSignaturesRequest err := json.NewDecoder(r.Body).Decode(&req.Data) switch { diff --git a/beacon-chain/rpc/eth/beacon/handlers_pool_test.go b/beacon-chain/rpc/eth/beacon/handlers_pool_test.go index f86734b1b3..0601428371 100644 --- a/beacon-chain/rpc/eth/beacon/handlers_pool_test.go +++ b/beacon-chain/rpc/eth/beacon/handlers_pool_test.go @@ -26,6 +26,7 @@ import ( "github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits/mock" p2pMock "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing" "github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core" + mockSync "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/initial-sync/testing" state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native" "github.com/OffchainLabs/prysm/v7/config/params" "github.com/OffchainLabs/prysm/v7/consensus-types/primitives" @@ -622,6 +623,8 @@ func TestSubmitAttestationsV2(t *testing.T) { HeadFetcher: chainService, ChainInfoFetcher: chainService, TimeFetcher: chainService, + OptimisticModeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, OperationNotifier: &blockchainmock.MockOperationNotifier{}, AttestationStateFetcher: chainService, } @@ -654,6 +657,7 @@ func TestSubmitAttestationsV2(t *testing.T) { assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch) assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root)) assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch) + time.Sleep(100 * time.Millisecond) // Wait for async pool save assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount()) }) t.Run("multiple", func(t *testing.T) { @@ -673,6 +677,7 @@ func TestSubmitAttestationsV2(t *testing.T) { assert.Equal(t, http.StatusOK, writer.Code) assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) assert.Equal(t, 2, broadcaster.NumAttestations()) + time.Sleep(100 * time.Millisecond) // Wait for async pool save assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount()) }) t.Run("phase0 att post electra", func(t *testing.T) { @@ -793,6 +798,7 @@ func TestSubmitAttestationsV2(t *testing.T) { assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch) assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root)) assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch) + time.Sleep(100 * time.Millisecond) // Wait for async pool save assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount()) }) t.Run("multiple", func(t *testing.T) { @@ -812,6 +818,7 @@ func TestSubmitAttestationsV2(t *testing.T) { assert.Equal(t, http.StatusOK, writer.Code) assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) assert.Equal(t, 2, broadcaster.NumAttestations()) + time.Sleep(100 * time.Millisecond) // Wait for async pool save assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount()) }) t.Run("no body", func(t *testing.T) { @@ -861,6 +868,27 @@ func TestSubmitAttestationsV2(t *testing.T) { assert.Equal(t, true, strings.Contains(e.Failures[0].Message, "Incorrect attestation signature")) }) }) + t.Run("syncing", func(t *testing.T) { + chainService := &blockchainmock.ChainService{} + s := &Server{ + HeadFetcher: chainService, + TimeFetcher: chainService, + OptimisticModeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: true}, + } + + var body bytes.Buffer + _, err := body.WriteString(singleAtt) + require.NoError(t, err) + request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) + request.Header.Set(api.VersionHeader, version.String(version.Phase0)) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.SubmitAttestationsV2(writer, request) + assert.Equal(t, http.StatusServiceUnavailable, writer.Code) + assert.Equal(t, true, strings.Contains(writer.Body.String(), "Beacon node is currently syncing")) + }) } func TestListVoluntaryExits(t *testing.T) { @@ -1057,14 +1085,19 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) { t.Run("single", func(t *testing.T) { broadcaster := &p2pMock.MockBroadcaster{} + chainService := &blockchainmock.ChainService{ + State: st, + SyncCommitteeIndices: []primitives.CommitteeIndex{0}, + } s := &Server{ + HeadFetcher: chainService, + TimeFetcher: chainService, + OptimisticModeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, CoreService: &core.Service{ SyncCommitteePool: synccommittee.NewStore(), P2P: broadcaster, - HeadFetcher: &blockchainmock.ChainService{ - State: st, - SyncCommitteeIndices: []primitives.CommitteeIndex{0}, - }, + HeadFetcher: chainService, }, } @@ -1089,14 +1122,19 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) { }) t.Run("multiple", func(t *testing.T) { broadcaster := &p2pMock.MockBroadcaster{} + chainService := &blockchainmock.ChainService{ + State: st, + SyncCommitteeIndices: []primitives.CommitteeIndex{0}, + } s := &Server{ + HeadFetcher: chainService, + TimeFetcher: chainService, + OptimisticModeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, CoreService: &core.Service{ SyncCommitteePool: synccommittee.NewStore(), P2P: broadcaster, - HeadFetcher: &blockchainmock.ChainService{ - State: st, - SyncCommitteeIndices: []primitives.CommitteeIndex{0}, - }, + HeadFetcher: chainService, }, } @@ -1120,13 +1158,18 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) { }) t.Run("invalid", func(t *testing.T) { broadcaster := &p2pMock.MockBroadcaster{} + chainService := &blockchainmock.ChainService{ + State: st, + } s := &Server{ + HeadFetcher: chainService, + TimeFetcher: chainService, + OptimisticModeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, CoreService: &core.Service{ SyncCommitteePool: synccommittee.NewStore(), P2P: broadcaster, - HeadFetcher: &blockchainmock.ChainService{ - State: st, - }, + HeadFetcher: chainService, }, } @@ -1149,7 +1192,13 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) { assert.Equal(t, false, broadcaster.BroadcastCalled.Load()) }) t.Run("empty", func(t *testing.T) { - s := &Server{} + chainService := &blockchainmock.ChainService{State: st} + s := &Server{ + HeadFetcher: chainService, + TimeFetcher: chainService, + OptimisticModeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + } var body bytes.Buffer _, err := body.WriteString("[]") @@ -1166,7 +1215,13 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) { assert.Equal(t, true, strings.Contains(e.Message, "No data submitted")) }) t.Run("no body", func(t *testing.T) { - s := &Server{} + chainService := &blockchainmock.ChainService{State: st} + s := &Server{ + HeadFetcher: chainService, + TimeFetcher: chainService, + OptimisticModeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + } request := httptest.NewRequest(http.MethodPost, "http://example.com", nil) writer := httptest.NewRecorder() @@ -1179,6 +1234,26 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) { assert.Equal(t, http.StatusBadRequest, e.Code) assert.Equal(t, true, strings.Contains(e.Message, "No data submitted")) }) + t.Run("syncing", func(t *testing.T) { + chainService := &blockchainmock.ChainService{State: st} + s := &Server{ + HeadFetcher: chainService, + TimeFetcher: chainService, + OptimisticModeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: true}, + } + + var body bytes.Buffer + _, err := body.WriteString(singleSyncCommitteeMsg) + require.NoError(t, err) + request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.SubmitSyncCommitteeSignatures(writer, request) + assert.Equal(t, http.StatusServiceUnavailable, writer.Code) + assert.Equal(t, true, strings.Contains(writer.Body.String(), "Beacon node is currently syncing")) + }) } func TestListBLSToExecutionChanges(t *testing.T) { diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/attester.go b/beacon-chain/rpc/prysm/v1alpha1/validator/attester.go index d23cd25186..e4e386c223 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/attester.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/attester.go @@ -52,24 +52,27 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestation") defer span.End() + if vs.SyncChecker.Syncing() { + return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond") + } + resp, err := vs.proposeAtt(ctx, att, att.GetData().CommitteeIndex) if err != nil { return nil, err } - if features.Get().EnableExperimentalAttestationPool { - if err = vs.AttestationCache.Add(att); err != nil { - log.WithError(err).Error("Could not save attestation") - } - } else { - go func() { + go func() { + if features.Get().EnableExperimentalAttestationPool { + if err := vs.AttestationCache.Add(att); err != nil { + log.WithError(err).Error("Could not save attestation") + } + } else { attCopy := att.Copy() if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil { log.WithError(err).Error("Could not save unaggregated attestation") - return } - }() - } + } + }() return resp, nil } @@ -82,6 +85,10 @@ func (vs *Server) ProposeAttestationElectra(ctx context.Context, singleAtt *ethp ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestationElectra") defer span.End() + if vs.SyncChecker.Syncing() { + return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond") + } + resp, err := vs.proposeAtt(ctx, singleAtt, singleAtt.GetCommitteeIndex()) if err != nil { return nil, err @@ -98,18 +105,17 @@ func (vs *Server) ProposeAttestationElectra(ctx context.Context, singleAtt *ethp singleAttCopy := singleAtt.Copy() att := singleAttCopy.ToAttestationElectra(committee) - if features.Get().EnableExperimentalAttestationPool { - if err = vs.AttestationCache.Add(att); err != nil { - log.WithError(err).Error("Could not save attestation") - } - } else { - go func() { + go func() { + if features.Get().EnableExperimentalAttestationPool { + if err := vs.AttestationCache.Add(att); err != nil { + log.WithError(err).Error("Could not save attestation") + } + } else { if err := vs.AttPool.SaveUnaggregatedAttestation(att); err != nil { log.WithError(err).Error("Could not save unaggregated attestation") - return } - }() - } + } + }() return resp, nil } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/attester_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/attester_test.go index 70916c4ac7..064e3f32b2 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/attester_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/attester_test.go @@ -38,6 +38,7 @@ func TestProposeAttestation(t *testing.T) { OperationNotifier: (&mock.ChainService{}).OperationNotifier(), TimeFetcher: chainService, AttestationStateFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, } head := util.NewBeaconBlock() head.Block.Slot = 999 @@ -141,6 +142,7 @@ func TestProposeAttestation_IncorrectSignature(t *testing.T) { P2P: &mockp2p.MockBroadcaster{}, AttPool: attestations.NewPool(), OperationNotifier: (&mock.ChainService{}).OperationNotifier(), + SyncChecker: &mockSync.Sync{IsSyncing: false}, } req := util.HydrateAttestation(ðpb.Attestation{}) @@ -149,6 +151,37 @@ func TestProposeAttestation_IncorrectSignature(t *testing.T) { assert.ErrorContains(t, wanted, err) } +func TestProposeAttestation_Syncing(t *testing.T) { + attesterServer := &Server{ + SyncChecker: &mockSync.Sync{IsSyncing: true}, + } + + req := util.HydrateAttestation(ðpb.Attestation{}) + _, err := attesterServer.ProposeAttestation(t.Context(), req) + assert.ErrorContains(t, "Syncing to latest head", err) + s, ok := status.FromError(err) + require.Equal(t, true, ok) + assert.Equal(t, codes.Unavailable, s.Code()) +} + +func TestProposeAttestationElectra_Syncing(t *testing.T) { + attesterServer := &Server{ + SyncChecker: &mockSync.Sync{IsSyncing: true}, + } + + req := ðpb.SingleAttestation{ + Data: ðpb.AttestationData{ + Source: ðpb.Checkpoint{Root: make([]byte, 32)}, + Target: ðpb.Checkpoint{Root: make([]byte, 32)}, + }, + } + _, err := attesterServer.ProposeAttestationElectra(t.Context(), req) + assert.ErrorContains(t, "Syncing to latest head", err) + s, ok := status.FromError(err) + require.Equal(t, true, ok) + assert.Equal(t, codes.Unavailable, s.Code()) +} + func TestGetAttestationData_OK(t *testing.T) { block := util.NewBeaconBlock() block.Block.Slot = 3*params.BeaconConfig().SlotsPerEpoch + 1 diff --git a/changelog/james-prysm_align-atter-pool-apis.md b/changelog/james-prysm_align-atter-pool-apis.md new file mode 100644 index 0000000000..53de89b44b --- /dev/null +++ b/changelog/james-prysm_align-atter-pool-apis.md @@ -0,0 +1,3 @@ +### Changed + +- the /eth/v2/beacon/pool/attestations and /eth/v1/beacon/pool/sync_committees now returns a 503 error if the node is still syncing, the rest api is also working in a similar process to gRPC broadcasting immediately now. \ No newline at end of file