diff --git a/beacon-chain/rpc/eth/beacon/handlers_pool.go b/beacon-chain/rpc/eth/beacon/handlers_pool.go index b26b298976..8b7be09d05 100644 --- a/beacon-chain/rpc/eth/beacon/handlers_pool.go +++ b/beacon-chain/rpc/eth/beacon/handlers_pool.go @@ -130,6 +130,10 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) { ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitAttestationsV2") defer span.End() + if shared.IsSyncing(ctx, w, s.SyncChecker, s.HeadFetcher, s.TimeFetcher, s.OptimisticModeFetcher) { + return + } + versionHeader := r.Header.Get(api.VersionHeader) if versionHeader == "" { httputil.HandleError(w, api.VersionHeader+" header is required", http.StatusBadRequest) @@ -238,22 +242,14 @@ func (s *Server) handleAttestationsElectra( }, }) - targetState, err := s.AttestationStateFetcher.AttestationTargetState(ctx, singleAtt.Data.Target) - if err != nil { - return nil, nil, errors.Wrap(err, "could not get target state for attestation") - } - committee, err := corehelpers.BeaconCommitteeFromState(ctx, targetState, singleAtt.Data.Slot, singleAtt.CommitteeId) - if err != nil { - return nil, nil, errors.Wrap(err, "could not get committee for attestation") - } - att := singleAtt.ToAttestationElectra(committee) - - wantedEpoch := slots.ToEpoch(att.Data.Slot) + // Broadcast first using CommitteeId directly (fast path) + // This matches gRPC behavior and avoids blocking on state fetching + wantedEpoch := slots.ToEpoch(singleAtt.Data.Slot) vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch) if err != nil { return nil, nil, errors.Wrap(err, "could not get head validator indices") } - subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.GetCommitteeIndex(), att.Data.Slot) + subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), singleAtt.CommitteeId, singleAtt.Data.Slot) if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, singleAtt); err != nil { failedBroadcasts = append(failedBroadcasts, &server.IndexedError{ Index: i, @@ -264,17 +260,35 @@ func (s *Server) handleAttestationsElectra( } continue } + } - if features.Get().EnableExperimentalAttestationPool { - if err = s.AttestationCache.Add(att); err != nil { - log.WithError(err).Error("Could not save attestation") + // Save to pool after broadcast (slow path - requires state fetching) + // Run in goroutine to avoid blocking the HTTP response + go func() { + for _, singleAtt := range validAttestations { + targetState, err := s.AttestationStateFetcher.AttestationTargetState(context.Background(), singleAtt.Data.Target) + if err != nil { + log.WithError(err).Error("Could not get target state for attestation") + continue } - } else { - if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil { - log.WithError(err).Error("Could not save attestation") + committee, err := corehelpers.BeaconCommitteeFromState(context.Background(), targetState, singleAtt.Data.Slot, singleAtt.CommitteeId) + if err != nil { + log.WithError(err).Error("Could not get committee for attestation") + continue + } + att := singleAtt.ToAttestationElectra(committee) + + if features.Get().EnableExperimentalAttestationPool { + if err = s.AttestationCache.Add(att); err != nil { + log.WithError(err).Error("Could not save attestation") + } + } else { + if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil { + log.WithError(err).Error("Could not save attestation") + } } } - } + }() if len(failedBroadcasts) > 0 { log.WithFields(logrus.Fields{ @@ -470,6 +484,10 @@ func (s *Server) SubmitSyncCommitteeSignatures(w http.ResponseWriter, r *http.Re ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitPoolSyncCommitteeSignatures") defer span.End() + if shared.IsSyncing(ctx, w, s.SyncChecker, s.HeadFetcher, s.TimeFetcher, s.OptimisticModeFetcher) { + return + } + var req structs.SubmitSyncCommitteeSignaturesRequest err := json.NewDecoder(r.Body).Decode(&req.Data) switch { diff --git a/beacon-chain/rpc/eth/beacon/handlers_pool_test.go b/beacon-chain/rpc/eth/beacon/handlers_pool_test.go index f86734b1b3..0601428371 100644 --- a/beacon-chain/rpc/eth/beacon/handlers_pool_test.go +++ b/beacon-chain/rpc/eth/beacon/handlers_pool_test.go @@ -26,6 +26,7 @@ import ( "github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits/mock" p2pMock "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing" "github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core" + mockSync "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/initial-sync/testing" state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native" "github.com/OffchainLabs/prysm/v7/config/params" "github.com/OffchainLabs/prysm/v7/consensus-types/primitives" @@ -622,6 +623,8 @@ func TestSubmitAttestationsV2(t *testing.T) { HeadFetcher: chainService, ChainInfoFetcher: chainService, TimeFetcher: chainService, + OptimisticModeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, OperationNotifier: &blockchainmock.MockOperationNotifier{}, AttestationStateFetcher: chainService, } @@ -654,6 +657,7 @@ func TestSubmitAttestationsV2(t *testing.T) { assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch) assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root)) assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch) + time.Sleep(100 * time.Millisecond) // Wait for async pool save assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount()) }) t.Run("multiple", func(t *testing.T) { @@ -673,6 +677,7 @@ func TestSubmitAttestationsV2(t *testing.T) { assert.Equal(t, http.StatusOK, writer.Code) assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) assert.Equal(t, 2, broadcaster.NumAttestations()) + time.Sleep(100 * time.Millisecond) // Wait for async pool save assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount()) }) t.Run("phase0 att post electra", func(t *testing.T) { @@ -793,6 +798,7 @@ func TestSubmitAttestationsV2(t *testing.T) { assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch) assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root)) assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch) + time.Sleep(100 * time.Millisecond) // Wait for async pool save assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount()) }) t.Run("multiple", func(t *testing.T) { @@ -812,6 +818,7 @@ func TestSubmitAttestationsV2(t *testing.T) { assert.Equal(t, http.StatusOK, writer.Code) assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) assert.Equal(t, 2, broadcaster.NumAttestations()) + time.Sleep(100 * time.Millisecond) // Wait for async pool save assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount()) }) t.Run("no body", func(t *testing.T) { @@ -861,6 +868,27 @@ func TestSubmitAttestationsV2(t *testing.T) { assert.Equal(t, true, strings.Contains(e.Failures[0].Message, "Incorrect attestation signature")) }) }) + t.Run("syncing", func(t *testing.T) { + chainService := &blockchainmock.ChainService{} + s := &Server{ + HeadFetcher: chainService, + TimeFetcher: chainService, + OptimisticModeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: true}, + } + + var body bytes.Buffer + _, err := body.WriteString(singleAtt) + require.NoError(t, err) + request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) + request.Header.Set(api.VersionHeader, version.String(version.Phase0)) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.SubmitAttestationsV2(writer, request) + assert.Equal(t, http.StatusServiceUnavailable, writer.Code) + assert.Equal(t, true, strings.Contains(writer.Body.String(), "Beacon node is currently syncing")) + }) } func TestListVoluntaryExits(t *testing.T) { @@ -1057,14 +1085,19 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) { t.Run("single", func(t *testing.T) { broadcaster := &p2pMock.MockBroadcaster{} + chainService := &blockchainmock.ChainService{ + State: st, + SyncCommitteeIndices: []primitives.CommitteeIndex{0}, + } s := &Server{ + HeadFetcher: chainService, + TimeFetcher: chainService, + OptimisticModeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, CoreService: &core.Service{ SyncCommitteePool: synccommittee.NewStore(), P2P: broadcaster, - HeadFetcher: &blockchainmock.ChainService{ - State: st, - SyncCommitteeIndices: []primitives.CommitteeIndex{0}, - }, + HeadFetcher: chainService, }, } @@ -1089,14 +1122,19 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) { }) t.Run("multiple", func(t *testing.T) { broadcaster := &p2pMock.MockBroadcaster{} + chainService := &blockchainmock.ChainService{ + State: st, + SyncCommitteeIndices: []primitives.CommitteeIndex{0}, + } s := &Server{ + HeadFetcher: chainService, + TimeFetcher: chainService, + OptimisticModeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, CoreService: &core.Service{ SyncCommitteePool: synccommittee.NewStore(), P2P: broadcaster, - HeadFetcher: &blockchainmock.ChainService{ - State: st, - SyncCommitteeIndices: []primitives.CommitteeIndex{0}, - }, + HeadFetcher: chainService, }, } @@ -1120,13 +1158,18 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) { }) t.Run("invalid", func(t *testing.T) { broadcaster := &p2pMock.MockBroadcaster{} + chainService := &blockchainmock.ChainService{ + State: st, + } s := &Server{ + HeadFetcher: chainService, + TimeFetcher: chainService, + OptimisticModeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, CoreService: &core.Service{ SyncCommitteePool: synccommittee.NewStore(), P2P: broadcaster, - HeadFetcher: &blockchainmock.ChainService{ - State: st, - }, + HeadFetcher: chainService, }, } @@ -1149,7 +1192,13 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) { assert.Equal(t, false, broadcaster.BroadcastCalled.Load()) }) t.Run("empty", func(t *testing.T) { - s := &Server{} + chainService := &blockchainmock.ChainService{State: st} + s := &Server{ + HeadFetcher: chainService, + TimeFetcher: chainService, + OptimisticModeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + } var body bytes.Buffer _, err := body.WriteString("[]") @@ -1166,7 +1215,13 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) { assert.Equal(t, true, strings.Contains(e.Message, "No data submitted")) }) t.Run("no body", func(t *testing.T) { - s := &Server{} + chainService := &blockchainmock.ChainService{State: st} + s := &Server{ + HeadFetcher: chainService, + TimeFetcher: chainService, + OptimisticModeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + } request := httptest.NewRequest(http.MethodPost, "http://example.com", nil) writer := httptest.NewRecorder() @@ -1179,6 +1234,26 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) { assert.Equal(t, http.StatusBadRequest, e.Code) assert.Equal(t, true, strings.Contains(e.Message, "No data submitted")) }) + t.Run("syncing", func(t *testing.T) { + chainService := &blockchainmock.ChainService{State: st} + s := &Server{ + HeadFetcher: chainService, + TimeFetcher: chainService, + OptimisticModeFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: true}, + } + + var body bytes.Buffer + _, err := body.WriteString(singleSyncCommitteeMsg) + require.NoError(t, err) + request := httptest.NewRequest(http.MethodPost, "http://example.com", &body) + writer := httptest.NewRecorder() + writer.Body = &bytes.Buffer{} + + s.SubmitSyncCommitteeSignatures(writer, request) + assert.Equal(t, http.StatusServiceUnavailable, writer.Code) + assert.Equal(t, true, strings.Contains(writer.Body.String(), "Beacon node is currently syncing")) + }) } func TestListBLSToExecutionChanges(t *testing.T) { diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/attester.go b/beacon-chain/rpc/prysm/v1alpha1/validator/attester.go index d23cd25186..e4e386c223 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/attester.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/attester.go @@ -52,24 +52,27 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestation") defer span.End() + if vs.SyncChecker.Syncing() { + return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond") + } + resp, err := vs.proposeAtt(ctx, att, att.GetData().CommitteeIndex) if err != nil { return nil, err } - if features.Get().EnableExperimentalAttestationPool { - if err = vs.AttestationCache.Add(att); err != nil { - log.WithError(err).Error("Could not save attestation") - } - } else { - go func() { + go func() { + if features.Get().EnableExperimentalAttestationPool { + if err := vs.AttestationCache.Add(att); err != nil { + log.WithError(err).Error("Could not save attestation") + } + } else { attCopy := att.Copy() if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil { log.WithError(err).Error("Could not save unaggregated attestation") - return } - }() - } + } + }() return resp, nil } @@ -82,6 +85,10 @@ func (vs *Server) ProposeAttestationElectra(ctx context.Context, singleAtt *ethp ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestationElectra") defer span.End() + if vs.SyncChecker.Syncing() { + return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond") + } + resp, err := vs.proposeAtt(ctx, singleAtt, singleAtt.GetCommitteeIndex()) if err != nil { return nil, err @@ -98,18 +105,17 @@ func (vs *Server) ProposeAttestationElectra(ctx context.Context, singleAtt *ethp singleAttCopy := singleAtt.Copy() att := singleAttCopy.ToAttestationElectra(committee) - if features.Get().EnableExperimentalAttestationPool { - if err = vs.AttestationCache.Add(att); err != nil { - log.WithError(err).Error("Could not save attestation") - } - } else { - go func() { + go func() { + if features.Get().EnableExperimentalAttestationPool { + if err := vs.AttestationCache.Add(att); err != nil { + log.WithError(err).Error("Could not save attestation") + } + } else { if err := vs.AttPool.SaveUnaggregatedAttestation(att); err != nil { log.WithError(err).Error("Could not save unaggregated attestation") - return } - }() - } + } + }() return resp, nil } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/attester_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/attester_test.go index 70916c4ac7..064e3f32b2 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/attester_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/attester_test.go @@ -38,6 +38,7 @@ func TestProposeAttestation(t *testing.T) { OperationNotifier: (&mock.ChainService{}).OperationNotifier(), TimeFetcher: chainService, AttestationStateFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, } head := util.NewBeaconBlock() head.Block.Slot = 999 @@ -141,6 +142,7 @@ func TestProposeAttestation_IncorrectSignature(t *testing.T) { P2P: &mockp2p.MockBroadcaster{}, AttPool: attestations.NewPool(), OperationNotifier: (&mock.ChainService{}).OperationNotifier(), + SyncChecker: &mockSync.Sync{IsSyncing: false}, } req := util.HydrateAttestation(ðpb.Attestation{}) @@ -149,6 +151,37 @@ func TestProposeAttestation_IncorrectSignature(t *testing.T) { assert.ErrorContains(t, wanted, err) } +func TestProposeAttestation_Syncing(t *testing.T) { + attesterServer := &Server{ + SyncChecker: &mockSync.Sync{IsSyncing: true}, + } + + req := util.HydrateAttestation(ðpb.Attestation{}) + _, err := attesterServer.ProposeAttestation(t.Context(), req) + assert.ErrorContains(t, "Syncing to latest head", err) + s, ok := status.FromError(err) + require.Equal(t, true, ok) + assert.Equal(t, codes.Unavailable, s.Code()) +} + +func TestProposeAttestationElectra_Syncing(t *testing.T) { + attesterServer := &Server{ + SyncChecker: &mockSync.Sync{IsSyncing: true}, + } + + req := ðpb.SingleAttestation{ + Data: ðpb.AttestationData{ + Source: ðpb.Checkpoint{Root: make([]byte, 32)}, + Target: ðpb.Checkpoint{Root: make([]byte, 32)}, + }, + } + _, err := attesterServer.ProposeAttestationElectra(t.Context(), req) + assert.ErrorContains(t, "Syncing to latest head", err) + s, ok := status.FromError(err) + require.Equal(t, true, ok) + assert.Equal(t, codes.Unavailable, s.Code()) +} + func TestGetAttestationData_OK(t *testing.T) { block := util.NewBeaconBlock() block.Block.Slot = 3*params.BeaconConfig().SlotsPerEpoch + 1 diff --git a/changelog/james-prysm_align-atter-pool-apis.md b/changelog/james-prysm_align-atter-pool-apis.md new file mode 100644 index 0000000000..53de89b44b --- /dev/null +++ b/changelog/james-prysm_align-atter-pool-apis.md @@ -0,0 +1,3 @@ +### Changed + +- the /eth/v2/beacon/pool/attestations and /eth/v1/beacon/pool/sync_committees now returns a 503 error if the node is still syncing, the rest api is also working in a similar process to gRPC broadcasting immediately now. \ No newline at end of file