diff --git a/api/client/beacon/client.go b/api/client/beacon/client.go index e5ec0ea243..46fac052c7 100644 --- a/api/client/beacon/client.go +++ b/api/client/beacon/client.go @@ -284,7 +284,7 @@ func (c *Client) SubmitChangeBLStoExecution(ctx context.Context, request []*stru if resp.StatusCode != http.StatusOK { decoder := json.NewDecoder(resp.Body) decoder.DisallowUnknownFields() - errorJson := &server.IndexedVerificationFailureError{} + errorJson := &server.IndexedErrorContainer{} if err := decoder.Decode(errorJson); err != nil { return errors.Wrapf(err, "failed to decode error JSON for %s", resp.Request.URL) } diff --git a/api/server/error.go b/api/server/error.go index feb09facb8..f1e398b40c 100644 --- a/api/server/error.go +++ b/api/server/error.go @@ -6,6 +6,11 @@ import ( "strings" ) +var ( + ErrIndexedValidationFail = "One or more messages failed validation" + ErrIndexedBroadcastFail = "One or more messages failed broadcast" +) + // DecodeError represents an error resulting from trying to decode an HTTP request. // It tracks the full field name for which decoding failed. type DecodeError struct { @@ -29,19 +34,38 @@ func (e *DecodeError) Error() string { return fmt.Sprintf("could not decode %s: %s", strings.Join(e.path, "."), e.err.Error()) } -// IndexedVerificationFailureError wraps a collection of verification failures. -type IndexedVerificationFailureError struct { - Message string `json:"message"` - Code int `json:"code"` - Failures []*IndexedVerificationFailure `json:"failures"` +// IndexedErrorContainer wraps a collection of indexed errors. +type IndexedErrorContainer struct { + Message string `json:"message"` + Code int `json:"code"` + Failures []*IndexedError `json:"failures"` } -func (e *IndexedVerificationFailureError) StatusCode() int { +func (e *IndexedErrorContainer) StatusCode() int { return e.Code } -// IndexedVerificationFailure represents an issue when verifying a single indexed object e.g. an item in an array. -type IndexedVerificationFailure struct { +// IndexedError represents an issue when processing a single indexed object e.g. an item in an array. +type IndexedError struct { Index int `json:"index"` Message string `json:"message"` } + +// BroadcastFailedError represents an error scenario where broadcasting a published message failed. +type BroadcastFailedError struct { + msg string + err error +} + +// NewBroadcastFailedError creates a new instance of BroadcastFailedError. +func NewBroadcastFailedError(msg string, err error) *BroadcastFailedError { + return &BroadcastFailedError{ + msg: msg, + err: err, + } +} + +// Error returns the underlying error message. +func (e *BroadcastFailedError) Error() string { + return fmt.Sprintf("could not broadcast %s: %s", e.msg, e.err.Error()) +} diff --git a/beacon-chain/rpc/core/BUILD.bazel b/beacon-chain/rpc/core/BUILD.bazel index 899f8d89b8..47e85fd9ce 100644 --- a/beacon-chain/rpc/core/BUILD.bazel +++ b/beacon-chain/rpc/core/BUILD.bazel @@ -12,6 +12,7 @@ go_library( importpath = "github.com/OffchainLabs/prysm/v6/beacon-chain/rpc/core", visibility = ["//visibility:public"], deps = [ + "//api/server:go_default_library", "//beacon-chain/blockchain:go_default_library", "//beacon-chain/cache:go_default_library", "//beacon-chain/core/altair:go_default_library", diff --git a/beacon-chain/rpc/core/validator.go b/beacon-chain/rpc/core/validator.go index e513fed97a..761e2478f1 100644 --- a/beacon-chain/rpc/core/validator.go +++ b/beacon-chain/rpc/core/validator.go @@ -7,6 +7,7 @@ import ( "sort" "time" + "github.com/OffchainLabs/prysm/v6/api/server" "github.com/OffchainLabs/prysm/v6/beacon-chain/cache" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/altair" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/epoch/precompute" @@ -36,24 +37,6 @@ import ( var errOptimisticMode = errors.New("the node is currently optimistic and cannot serve validators") -// AggregateBroadcastFailedError represents an error scenario where -// broadcasting an aggregate selection proof failed. -type AggregateBroadcastFailedError struct { - err error -} - -// NewAggregateBroadcastFailedError creates a new error instance. -func NewAggregateBroadcastFailedError(err error) AggregateBroadcastFailedError { - return AggregateBroadcastFailedError{ - err: err, - } -} - -// Error returns the underlying error message. -func (e *AggregateBroadcastFailedError) Error() string { - return fmt.Sprintf("could not broadcast signed aggregated attestation: %s", e.err.Error()) -} - // ComputeValidatorPerformance reports the validator's latest balance along with other important metrics on // rewards and penalties throughout its lifecycle in the beacon chain. func (s *Service) ComputeValidatorPerformance( @@ -360,7 +343,8 @@ func (s *Service) SubmitSignedContributionAndProof( // Wait for p2p broadcast to complete and return the first error (if any) err := errs.Wait() if err != nil { - return &RpcError{Err: err, Reason: Internal} + log.WithError(err).Debug("Could not broadcast signed contribution and proof") + return &RpcError{Err: server.NewBroadcastFailedError("SignedContributionAndProof", err), Reason: Internal} } s.OperationNotifier.OperationFeed().Send(&feed.Event{ @@ -411,7 +395,8 @@ func (s *Service) SubmitSignedAggregateSelectionProof( } if err := s.Broadcaster.Broadcast(ctx, agg); err != nil { - return &RpcError{Err: &AggregateBroadcastFailedError{err: err}, Reason: Internal} + log.WithError(err).Debug("Could not broadcast signed aggregate att and proof") + return &RpcError{Err: server.NewBroadcastFailedError("SignedAggregateAttAndProof", err), Reason: Internal} } if logrus.GetLevel() >= logrus.DebugLevel { diff --git a/beacon-chain/rpc/eth/beacon/handlers_pool.go b/beacon-chain/rpc/eth/beacon/handlers_pool.go index 168519c307..f8ca39e2ea 100644 --- a/beacon-chain/rpc/eth/beacon/handlers_pool.go +++ b/beacon-chain/rpc/eth/beacon/handlers_pool.go @@ -6,8 +6,6 @@ import ( "fmt" "io" "net/http" - "strconv" - "strings" "time" "github.com/OffchainLabs/prysm/v6/api" @@ -31,6 +29,7 @@ import ( "github.com/OffchainLabs/prysm/v6/runtime/version" "github.com/OffchainLabs/prysm/v6/time/slots" "github.com/pkg/errors" + "github.com/sirupsen/logrus" ) const broadcastBLSChangesRateLimit = 128 @@ -200,22 +199,23 @@ func (s *Server) SubmitAttestations(w http.ResponseWriter, r *http.Request) { return } - if len(failedBroadcasts) > 0 { - httputil.HandleError( - w, - fmt.Sprintf("Attestations at index %s could not be broadcasted", strings.Join(failedBroadcasts, ", ")), - http.StatusInternalServerError, - ) - return - } - if len(attFailures) > 0 { - failuresErr := &server.IndexedVerificationFailureError{ + failuresErr := &server.IndexedErrorContainer{ Code: http.StatusBadRequest, - Message: "One or more attestations failed validation", + Message: server.ErrIndexedValidationFail, Failures: attFailures, } httputil.WriteError(w, failuresErr) + return + } + if len(failedBroadcasts) > 0 { + failuresErr := &server.IndexedErrorContainer{ + Code: http.StatusInternalServerError, + Message: server.ErrIndexedBroadcastFail, + Failures: failedBroadcasts, + } + httputil.WriteError(w, failuresErr) + return } } @@ -247,8 +247,8 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) { return } - var attFailures []*server.IndexedVerificationFailure - var failedBroadcasts []string + var attFailures []*server.IndexedError + var failedBroadcasts []*server.IndexedError if v >= version.Electra { attFailures, failedBroadcasts, err = s.handleAttestationsElectra(ctx, req.Data) @@ -260,29 +260,30 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) { return } - if len(failedBroadcasts) > 0 { - httputil.HandleError( - w, - fmt.Sprintf("Attestations at index %s could not be broadcasted", strings.Join(failedBroadcasts, ", ")), - http.StatusInternalServerError, - ) - return - } - if len(attFailures) > 0 { - failuresErr := &server.IndexedVerificationFailureError{ + failuresErr := &server.IndexedErrorContainer{ Code: http.StatusBadRequest, - Message: "One or more attestations failed validation", + Message: server.ErrIndexedValidationFail, Failures: attFailures, } httputil.WriteError(w, failuresErr) + return + } + if len(failedBroadcasts) > 0 { + failuresErr := &server.IndexedErrorContainer{ + Code: http.StatusInternalServerError, + Message: server.ErrIndexedBroadcastFail, + Failures: failedBroadcasts, + } + httputil.WriteError(w, failuresErr) + return } } func (s *Server) handleAttestationsElectra( ctx context.Context, data json.RawMessage, -) (attFailures []*server.IndexedVerificationFailure, failedBroadcasts []string, err error) { +) (attFailures []*server.IndexedError, failedBroadcasts []*server.IndexedError, err error) { var sourceAttestations []*structs.SingleAttestation currentEpoch := slots.ToEpoch(s.TimeFetcher.CurrentSlot()) if currentEpoch < params.BeaconConfig().ElectraForkEpoch { @@ -301,14 +302,14 @@ func (s *Server) handleAttestationsElectra( for i, sourceAtt := range sourceAttestations { att, err := sourceAtt.ToConsensus() if err != nil { - attFailures = append(attFailures, &server.IndexedVerificationFailure{ + attFailures = append(attFailures, &server.IndexedError{ Index: i, Message: "Could not convert request attestation to consensus attestation: " + err.Error(), }) continue } if _, err = bls.SignatureFromBytes(att.Signature); err != nil { - attFailures = append(attFailures, &server.IndexedVerificationFailure{ + attFailures = append(attFailures, &server.IndexedError{ Index: i, Message: "Incorrect attestation signature: " + err.Error(), }) @@ -317,6 +318,13 @@ func (s *Server) handleAttestationsElectra( validAttestations = append(validAttestations, att) } + // We store the error for the first failed broadcast and use it in the log message in case + // there are broadcast issues. Having a single log at the end instead of logging + // for every failed broadcast prevents log noise in case there are many failures. + // Even though we only retain the first error, there is a very good chance that all + // broadcasts fail for the same reason, so this should be sufficient in most cases. + var broadcastErr error + for i, singleAtt := range validAttestations { s.OperationNotifier.OperationFeed().Send(&feed.Event{ Type: operation.SingleAttReceived, @@ -338,31 +346,45 @@ func (s *Server) handleAttestationsElectra( wantedEpoch := slots.ToEpoch(att.Data.Slot) vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch) if err != nil { - failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i)) - continue + return nil, nil, errors.Wrap(err, "could not get head validator indices") } subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.GetCommitteeIndex(), att.Data.Slot) if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, singleAtt); err != nil { - log.WithError(err).Errorf("could not broadcast attestation at index %d", i) - failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i)) + failedBroadcasts = append(failedBroadcasts, &server.IndexedError{ + Index: i, + Message: server.NewBroadcastFailedError("SingleAttestation", err).Error(), + }) + if broadcastErr == nil { + broadcastErr = err + } continue } if features.Get().EnableExperimentalAttestationPool { if err = s.AttestationCache.Add(att); err != nil { - log.WithError(err).Error("could not save attestation") + log.WithError(err).Error("Could not save attestation") } } else { if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil { - log.WithError(err).Error("could not save attestation") + log.WithError(err).Error("Could not save attestation") } } } + if len(failedBroadcasts) > 0 { + log.WithFields(logrus.Fields{ + "failedCount": len(failedBroadcasts), + "totalCount": len(validAttestations), + }).WithError(broadcastErr).Error("Some attestations failed to be broadcast") + } + return attFailures, failedBroadcasts, nil } -func (s *Server) handleAttestations(ctx context.Context, data json.RawMessage) (attFailures []*server.IndexedVerificationFailure, failedBroadcasts []string, err error) { +func (s *Server) handleAttestations( + ctx context.Context, + data json.RawMessage, +) (attFailures []*server.IndexedError, failedBroadcasts []*server.IndexedError, err error) { var sourceAttestations []*structs.Attestation if slots.ToEpoch(s.TimeFetcher.CurrentSlot()) >= params.BeaconConfig().ElectraForkEpoch { @@ -381,14 +403,14 @@ func (s *Server) handleAttestations(ctx context.Context, data json.RawMessage) ( for i, sourceAtt := range sourceAttestations { att, err := sourceAtt.ToConsensus() if err != nil { - attFailures = append(attFailures, &server.IndexedVerificationFailure{ + attFailures = append(attFailures, &server.IndexedError{ Index: i, Message: "Could not convert request attestation to consensus attestation: " + err.Error(), }) continue } if _, err = bls.SignatureFromBytes(att.Signature); err != nil { - attFailures = append(attFailures, &server.IndexedVerificationFailure{ + attFailures = append(attFailures, &server.IndexedError{ Index: i, Message: "Incorrect attestation signature: " + err.Error(), }) @@ -397,6 +419,13 @@ func (s *Server) handleAttestations(ctx context.Context, data json.RawMessage) ( validAttestations = append(validAttestations, att) } + // We store the error for the first failed broadcast and use it in the log message in case + // there are broadcast issues. Having a single log at the end instead of logging + // for every failed broadcast prevents log noise in case there are many failures. + // Even though we only retain the first error, there is a very good chance that all + // broadcasts fail for the same reason, so this should be sufficient in most cases. + var broadcastErr error + for i, att := range validAttestations { // Broadcast the unaggregated attestation on a feed to notify other services in the beacon node // of a received unaggregated attestation. @@ -413,32 +442,43 @@ func (s *Server) handleAttestations(ctx context.Context, data json.RawMessage) ( wantedEpoch := slots.ToEpoch(att.Data.Slot) vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch) if err != nil { - failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i)) - continue + return nil, nil, errors.Wrap(err, "could not get head validator indices") } subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.Data.CommitteeIndex, att.Data.Slot) if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, att); err != nil { - log.WithError(err).Errorf("could not broadcast attestation at index %d", i) - failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i)) + failedBroadcasts = append(failedBroadcasts, &server.IndexedError{ + Index: i, + Message: server.NewBroadcastFailedError("Attestation", err).Error(), + }) + if broadcastErr == nil { + broadcastErr = err + } continue } if features.Get().EnableExperimentalAttestationPool { if err = s.AttestationCache.Add(att); err != nil { - log.WithError(err).Error("could not save attestation") + log.WithError(err).Error("Could not save attestation") } } else if att.IsAggregated() { if err = s.AttestationsPool.SaveAggregatedAttestation(att); err != nil { - log.WithError(err).Error("could not save aggregated attestation") + log.WithError(err).Error("Could not save aggregated attestation") } } else { if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil { - log.WithError(err).Error("could not save unaggregated attestation") + log.WithError(err).Error("Could not save unaggregated attestation") } } } + if len(failedBroadcasts) > 0 { + log.WithFields(logrus.Fields{ + "failedCount": len(failedBroadcasts), + "totalCount": len(validAttestations), + }).WithError(broadcastErr).Error("Some attestations failed to be broadcast") + } + return attFailures, failedBroadcasts, nil } @@ -541,11 +581,11 @@ func (s *Server) SubmitSyncCommitteeSignatures(w http.ResponseWriter, r *http.Re } var validMessages []*eth.SyncCommitteeMessage - var msgFailures []*server.IndexedVerificationFailure + var msgFailures []*server.IndexedError for i, sourceMsg := range req.Data { msg, err := sourceMsg.ToConsensus() if err != nil { - msgFailures = append(msgFailures, &server.IndexedVerificationFailure{ + msgFailures = append(msgFailures, &server.IndexedError{ Index: i, Message: "Could not convert request message to consensus message: " + err.Error(), }) @@ -562,7 +602,7 @@ func (s *Server) SubmitSyncCommitteeSignatures(w http.ResponseWriter, r *http.Re } if len(msgFailures) > 0 { - failuresErr := &server.IndexedVerificationFailureError{ + failuresErr := &server.IndexedErrorContainer{ Code: http.StatusBadRequest, Message: "One or more messages failed validation", Failures: msgFailures, @@ -581,7 +621,7 @@ func (s *Server) SubmitBLSToExecutionChanges(w http.ResponseWriter, r *http.Requ httputil.HandleError(w, fmt.Sprintf("Could not get head state: %v", err), http.StatusInternalServerError) return } - var failures []*server.IndexedVerificationFailure + var failures []*server.IndexedError var toBroadcast []*eth.SignedBLSToExecutionChange var req []*structs.SignedBLSToExecutionChange @@ -602,7 +642,7 @@ func (s *Server) SubmitBLSToExecutionChanges(w http.ResponseWriter, r *http.Requ for i, change := range req { sbls, err := change.ToConsensus() if err != nil { - failures = append(failures, &server.IndexedVerificationFailure{ + failures = append(failures, &server.IndexedError{ Index: i, Message: "Unable to decode SignedBLSToExecutionChange: " + err.Error(), }) @@ -610,14 +650,14 @@ func (s *Server) SubmitBLSToExecutionChanges(w http.ResponseWriter, r *http.Requ } _, err = blocks.ValidateBLSToExecutionChange(st, sbls) if err != nil { - failures = append(failures, &server.IndexedVerificationFailure{ + failures = append(failures, &server.IndexedError{ Index: i, Message: "Could not validate SignedBLSToExecutionChange: " + err.Error(), }) continue } if err := blocks.VerifyBLSChangeSignature(st, sbls); err != nil { - failures = append(failures, &server.IndexedVerificationFailure{ + failures = append(failures, &server.IndexedError{ Index: i, Message: "Could not validate signature: " + err.Error(), }) @@ -636,9 +676,9 @@ func (s *Server) SubmitBLSToExecutionChanges(w http.ResponseWriter, r *http.Requ } go s.broadcastBLSChanges(context.Background(), toBroadcast) if len(failures) > 0 { - failuresErr := &server.IndexedVerificationFailureError{ + failuresErr := &server.IndexedErrorContainer{ Code: http.StatusBadRequest, - Message: "One or more BLSToExecutionChange failed validation", + Message: server.ErrIndexedValidationFail, Failures: failures, } httputil.WriteError(w, failuresErr) @@ -655,18 +695,18 @@ func (s *Server) broadcastBLSBatch(ctx context.Context, ptr *[]*eth.SignedBLSToE } st, err := s.ChainInfoFetcher.HeadStateReadOnly(ctx) if err != nil { - log.WithError(err).Error("could not get head state") + log.WithError(err).Error("Could not get head state") return } for _, ch := range (*ptr)[:limit] { if ch != nil { _, err := blocks.ValidateBLSToExecutionChange(st, ch) if err != nil { - log.WithError(err).Error("could not validate BLS to execution change") + log.WithError(err).Error("Could not validate BLS to execution change") continue } if err := s.Broadcaster.Broadcast(ctx, ch); err != nil { - log.WithError(err).Error("could not broadcast BLS to execution changes.") + log.WithError(err).Error("Could not broadcast BLS to execution changes.") } } } diff --git a/beacon-chain/rpc/eth/beacon/handlers_pool_test.go b/beacon-chain/rpc/eth/beacon/handlers_pool_test.go index d3b1a07672..4fb0c7d238 100644 --- a/beacon-chain/rpc/eth/beacon/handlers_pool_test.go +++ b/beacon-chain/rpc/eth/beacon/handlers_pool_test.go @@ -638,7 +638,7 @@ func TestSubmitAttestations(t *testing.T) { s.SubmitAttestations(writer, request) assert.Equal(t, http.StatusBadRequest, writer.Code) - e := &server.IndexedVerificationFailureError{} + e := &server.IndexedErrorContainer{} require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) assert.Equal(t, http.StatusBadRequest, e.Code) require.Equal(t, 1, len(e.Failures)) @@ -772,7 +772,7 @@ func TestSubmitAttestations(t *testing.T) { s.SubmitAttestationsV2(writer, request) assert.Equal(t, http.StatusBadRequest, writer.Code) - e := &server.IndexedVerificationFailureError{} + e := &server.IndexedErrorContainer{} require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) assert.Equal(t, http.StatusBadRequest, e.Code) require.Equal(t, 1, len(e.Failures)) @@ -873,7 +873,7 @@ func TestSubmitAttestations(t *testing.T) { s.SubmitAttestationsV2(writer, request) assert.Equal(t, http.StatusBadRequest, writer.Code) - e := &server.IndexedVerificationFailureError{} + e := &server.IndexedErrorContainer{} require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e)) assert.Equal(t, http.StatusBadRequest, e.Code) require.Equal(t, 1, len(e.Failures)) @@ -1538,7 +1538,7 @@ func TestSubmitSignedBLSToExecutionChanges_Failures(t *testing.T) { s.SubmitBLSToExecutionChanges(writer, request) assert.Equal(t, http.StatusBadRequest, writer.Code) time.Sleep(10 * time.Millisecond) // Delay to allow the routine to start - require.StringContains(t, "One or more BLSToExecutionChange failed validation", writer.Body.String()) + require.StringContains(t, "One or more messages failed validation", writer.Body.String()) assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) assert.Equal(t, numValidators, len(broadcaster.BroadcastMessages)+1) diff --git a/beacon-chain/rpc/eth/validator/BUILD.bazel b/beacon-chain/rpc/eth/validator/BUILD.bazel index f4d7f416a7..a965ef4aee 100644 --- a/beacon-chain/rpc/eth/validator/BUILD.bazel +++ b/beacon-chain/rpc/eth/validator/BUILD.bazel @@ -12,6 +12,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//api:go_default_library", + "//api/server:go_default_library", "//api/server/structs:go_default_library", "//beacon-chain/blockchain:go_default_library", "//beacon-chain/builder:go_default_library", diff --git a/beacon-chain/rpc/eth/validator/handlers.go b/beacon-chain/rpc/eth/validator/handlers.go index 7be36698d7..043ad17bb8 100644 --- a/beacon-chain/rpc/eth/validator/handlers.go +++ b/beacon-chain/rpc/eth/validator/handlers.go @@ -14,6 +14,7 @@ import ( "time" "github.com/OffchainLabs/prysm/v6/api" + "github.com/OffchainLabs/prysm/v6/api/server" "github.com/OffchainLabs/prysm/v6/api/server/structs" "github.com/OffchainLabs/prysm/v6/beacon-chain/builder" "github.com/OffchainLabs/prysm/v6/beacon-chain/cache" @@ -268,22 +269,61 @@ func (s *Server) SubmitContributionAndProofs(w http.ResponseWriter, r *http.Requ return } - for _, item := range reqData { + var failures []*server.IndexedError + var failedBroadcasts []*server.IndexedError + + for i, item := range reqData { var contribution structs.SignedContributionAndProof if err := json.Unmarshal(item, &contribution); err != nil { - httputil.HandleError(w, "Could not decode item: "+err.Error(), http.StatusBadRequest) - return + failures = append(failures, &server.IndexedError{ + Index: i, + Message: "Could not unmarshal message: " + err.Error(), + }) + continue } consensusItem, err := contribution.ToConsensus() if err != nil { - httputil.HandleError(w, "Could not convert contribution to consensus format: "+err.Error(), http.StatusBadRequest) - return + failures = append(failures, &server.IndexedError{ + Index: i, + Message: "Could not convert request contribution to consensus contribution: " + err.Error(), + }) + continue } - if rpcError := s.CoreService.SubmitSignedContributionAndProof(ctx, consensusItem); rpcError != nil { - httputil.HandleError(w, rpcError.Err.Error(), core.ErrorReasonToHTTP(rpcError.Reason)) - return + + rpcError := s.CoreService.SubmitSignedContributionAndProof(ctx, consensusItem) + if rpcError != nil { + var broadcastFailedErr *server.BroadcastFailedError + if errors.As(rpcError.Err, &broadcastFailedErr) { + failedBroadcasts = append(failedBroadcasts, &server.IndexedError{ + Index: i, + Message: rpcError.Err.Error(), + }) + continue + } else { + httputil.HandleError(w, rpcError.Err.Error(), core.ErrorReasonToHTTP(rpcError.Reason)) + return + } } } + + if len(failures) > 0 { + failuresErr := &server.IndexedErrorContainer{ + Code: http.StatusBadRequest, + Message: server.ErrIndexedValidationFail, + Failures: failures, + } + httputil.WriteError(w, failuresErr) + return + } + if len(failedBroadcasts) > 0 { + failuresErr := &server.IndexedErrorContainer{ + Code: http.StatusInternalServerError, + Message: server.ErrIndexedBroadcastFail, + Failures: failedBroadcasts, + } + httputil.WriteError(w, failuresErr) + return + } } // Deprecated: use SubmitAggregateAndProofsV2 instead @@ -322,8 +362,8 @@ func (s *Server) SubmitAggregateAndProofs(w http.ResponseWriter, r *http.Request } rpcError := s.CoreService.SubmitSignedAggregateSelectionProof(ctx, consensusItem) if rpcError != nil { - var aggregateBroadcastFailedError *core.AggregateBroadcastFailedError - ok := errors.As(rpcError.Err, &aggregateBroadcastFailedError) + var broadcastFailedErr *server.BroadcastFailedError + ok := errors.As(rpcError.Err, &broadcastFailedErr) if ok { broadcastFailed = true } else { @@ -368,49 +408,83 @@ func (s *Server) SubmitAggregateAndProofsV2(w http.ResponseWriter, r *http.Reque return } - broadcastFailed := false + var failures []*server.IndexedError + var failedBroadcasts []*server.IndexedError + var rpcError *core.RpcError - for _, raw := range reqData { + for i, raw := range reqData { if v >= version.Electra { var signedAggregate structs.SignedAggregateAttestationAndProofElectra err = json.Unmarshal(raw, &signedAggregate) if err != nil { - httputil.HandleError(w, "Failed to parse aggregate attestation and proof: "+err.Error(), http.StatusBadRequest) - return + failures = append(failures, &server.IndexedError{ + Index: i, + Message: "Could not parse message: " + err.Error(), + }) + continue } consensusItem, err := signedAggregate.ToConsensus() if err != nil { - httputil.HandleError(w, "Could not convert request aggregate to consensus aggregate: "+err.Error(), http.StatusBadRequest) - return + failures = append(failures, &server.IndexedError{ + Index: i, + Message: "Could not convert request aggregate to consensus aggregate: " + err.Error(), + }) + continue } rpcError = s.CoreService.SubmitSignedAggregateSelectionProof(ctx, consensusItem) } else { var signedAggregate structs.SignedAggregateAttestationAndProof err = json.Unmarshal(raw, &signedAggregate) if err != nil { - httputil.HandleError(w, "Failed to parse aggregate attestation and proof: "+err.Error(), http.StatusBadRequest) - return + failures = append(failures, &server.IndexedError{ + Index: i, + Message: "Could not parse message: " + err.Error(), + }) + continue } consensusItem, err := signedAggregate.ToConsensus() if err != nil { - httputil.HandleError(w, "Could not convert request aggregate to consensus aggregate: "+err.Error(), http.StatusBadRequest) - return + failures = append(failures, &server.IndexedError{ + Index: i, + Message: "Could not convert request aggregate to consensus aggregate: " + err.Error(), + }) + continue } rpcError = s.CoreService.SubmitSignedAggregateSelectionProof(ctx, consensusItem) } if rpcError != nil { - var aggregateBroadcastFailedError *core.AggregateBroadcastFailedError - if errors.As(rpcError.Err, &aggregateBroadcastFailedError) { - broadcastFailed = true + var broadcastFailedErr *server.BroadcastFailedError + if errors.As(rpcError.Err, &broadcastFailedErr) { + failedBroadcasts = append(failedBroadcasts, &server.IndexedError{ + Index: i, + Message: rpcError.Err.Error(), + }) + continue } else { httputil.HandleError(w, rpcError.Err.Error(), core.ErrorReasonToHTTP(rpcError.Reason)) return } } } - if broadcastFailed { - httputil.HandleError(w, "Could not broadcast one or more signed aggregated attestations", http.StatusInternalServerError) + + if len(failures) > 0 { + failuresErr := &server.IndexedErrorContainer{ + Code: http.StatusBadRequest, + Message: server.ErrIndexedValidationFail, + Failures: failures, + } + httputil.WriteError(w, failuresErr) + return + } + if len(failedBroadcasts) > 0 { + failuresErr := &server.IndexedErrorContainer{ + Code: http.StatusInternalServerError, + Message: server.ErrIndexedBroadcastFail, + Failures: failedBroadcasts, + } + httputil.WriteError(w, failuresErr) + return } } diff --git a/changelog/radek_api-individual-failure.md b/changelog/radek_api-individual-failure.md new file mode 100644 index 0000000000..26480c06fc --- /dev/null +++ b/changelog/radek_api-individual-failure.md @@ -0,0 +1,3 @@ +### Changed + +- Improve returning individual message errors from Beacon API. \ No newline at end of file diff --git a/cmd/prysmctl/validator/withdraw_test.go b/cmd/prysmctl/validator/withdraw_test.go index 1b8d040c49..dcc4a61339 100644 --- a/cmd/prysmctl/validator/withdraw_test.go +++ b/cmd/prysmctl/validator/withdraw_test.go @@ -219,8 +219,8 @@ func TestCallWithdrawalEndpoint_Errors(t *testing.T) { if r.Method == http.MethodPost && r.RequestURI == "/eth/v1/beacon/pool/bls_to_execution_changes" { w.WriteHeader(400) w.Header().Set("Content-Type", "application/json") - err = json.NewEncoder(w).Encode(&server.IndexedVerificationFailureError{ - Failures: []*server.IndexedVerificationFailure{ + err = json.NewEncoder(w).Encode(&server.IndexedErrorContainer{ + Failures: []*server.IndexedError{ {Index: 0, Message: "Could not validate SignedBLSToExecutionChange"}, }, })