Compare commits

...

5 Commits

Author SHA1 Message Date
Kasey Kirkham
c30a1f9aa0 proof of concept for safe slot conversion without error handling 2025-10-14 14:31:14 -05:00
Radosław Kapka
683608e34a Improve returning individual message errors from Beacon API (#15835)
* Improve returning individual message errors from Beacon API

* changelog <3

* fix test

* add debug logs

* batch broadcast errors

* use logrus fields

* capitalize log messages
2025-10-14 15:22:00 +00:00
Manu NALEPA
fbbf2a1404 HasAtLeastOneIndex: Check the index is not too high. (#15865) 2025-10-14 14:39:38 +00:00
Potuz
82f556c50f Remove redundant check (#15844)
* Remove redundant check

* changelog

* fix gazelle
2025-10-14 12:39:19 +00:00
Radosław Kapka
c88aa77ac1 Display non-JSON error messages (#15860)
* Display non-JSON error messages

* changelog <3
2025-10-14 12:08:21 +00:00
23 changed files with 344 additions and 148 deletions

View File

@@ -284,7 +284,7 @@ func (c *Client) SubmitChangeBLStoExecution(ctx context.Context, request []*stru
if resp.StatusCode != http.StatusOK {
decoder := json.NewDecoder(resp.Body)
decoder.DisallowUnknownFields()
errorJson := &server.IndexedVerificationFailureError{}
errorJson := &server.IndexedErrorContainer{}
if err := decoder.Decode(errorJson); err != nil {
return errors.Wrapf(err, "failed to decode error JSON for %s", resp.Request.URL)
}

View File

@@ -6,6 +6,11 @@ import (
"strings"
)
var (
ErrIndexedValidationFail = "One or more messages failed validation"
ErrIndexedBroadcastFail = "One or more messages failed broadcast"
)
// DecodeError represents an error resulting from trying to decode an HTTP request.
// It tracks the full field name for which decoding failed.
type DecodeError struct {
@@ -29,19 +34,38 @@ func (e *DecodeError) Error() string {
return fmt.Sprintf("could not decode %s: %s", strings.Join(e.path, "."), e.err.Error())
}
// IndexedVerificationFailureError wraps a collection of verification failures.
type IndexedVerificationFailureError struct {
Message string `json:"message"`
Code int `json:"code"`
Failures []*IndexedVerificationFailure `json:"failures"`
// IndexedErrorContainer wraps a collection of indexed errors.
type IndexedErrorContainer struct {
Message string `json:"message"`
Code int `json:"code"`
Failures []*IndexedError `json:"failures"`
}
func (e *IndexedVerificationFailureError) StatusCode() int {
func (e *IndexedErrorContainer) StatusCode() int {
return e.Code
}
// IndexedVerificationFailure represents an issue when verifying a single indexed object e.g. an item in an array.
type IndexedVerificationFailure struct {
// IndexedError represents an issue when processing a single indexed object e.g. an item in an array.
type IndexedError struct {
Index int `json:"index"`
Message string `json:"message"`
}
// BroadcastFailedError represents an error scenario where broadcasting a published message failed.
type BroadcastFailedError struct {
msg string
err error
}
// NewBroadcastFailedError creates a new instance of BroadcastFailedError.
func NewBroadcastFailedError(msg string, err error) *BroadcastFailedError {
return &BroadcastFailedError{
msg: msg,
err: err,
}
}
// Error returns the underlying error message.
func (e *BroadcastFailedError) Error() string {
return fmt.Sprintf("could not broadcast %s: %s", e.msg, e.err.Error())
}

View File

@@ -35,8 +35,9 @@ func (s DataColumnStorageSummary) HasIndex(index uint64) bool {
// HasAtLeastOneIndex returns true if at least one of the DataColumnSidecars at the given indices is available in the filesystem.
func (s DataColumnStorageSummary) HasAtLeastOneIndex(indices []uint64) bool {
size := uint64(len(s.mask))
for _, index := range indices {
if s.mask[index] {
if index < size && s.mask[index] {
return true
}
}

View File

@@ -25,11 +25,11 @@ func TestHasIndex(t *testing.T) {
func TestHasAtLeastOneIndex(t *testing.T) {
summary := NewDataColumnStorageSummary(0, [fieldparams.NumberOfColumns]bool{false, true})
hasAtLeastOneIndex := summary.HasAtLeastOneIndex([]uint64{3, 1, 2})
require.Equal(t, true, hasAtLeastOneIndex)
actual := summary.HasAtLeastOneIndex([]uint64{3, 1, fieldparams.NumberOfColumns, 2})
require.Equal(t, true, actual)
hasAtLeastOneIndex = summary.HasAtLeastOneIndex([]uint64{3, 4, 2})
require.Equal(t, false, hasAtLeastOneIndex)
actual = summary.HasAtLeastOneIndex([]uint64{3, 4, fieldparams.NumberOfColumns, 2})
require.Equal(t, false, actual)
}
func TestCount(t *testing.T) {

View File

@@ -58,7 +58,6 @@ go_library(
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//container/slice:go_default_library",
"//encoding/bytesutil:go_default_library",
"//genesis:go_default_library",
"//monitoring/prometheus:go_default_library",
"//monitoring/tracing:go_default_library",

View File

@@ -60,7 +60,6 @@ import (
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/container/slice"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v6/genesis"
"github.com/OffchainLabs/prysm/v6/monitoring/prometheus"
"github.com/OffchainLabs/prysm/v6/runtime"
@@ -598,22 +597,7 @@ func (b *BeaconNode) startStateGen(ctx context.Context, bfs coverage.AvailableBl
return err
}
r := bytesutil.ToBytes32(cp.Root)
// Consider edge case where finalized root are zeros instead of genesis root hash.
if r == params.BeaconConfig().ZeroHash {
genesisBlock, err := b.db.GenesisBlock(ctx)
if err != nil {
return err
}
if genesisBlock != nil && !genesisBlock.IsNil() {
r, err = genesisBlock.Block().HashTreeRoot()
if err != nil {
return err
}
}
}
b.finalizedStateAtStartUp, err = sg.StateByRoot(ctx, r)
b.finalizedStateAtStartUp, err = sg.StateByRoot(ctx, [32]byte(cp.Root))
if err != nil {
return err
}

View File

@@ -12,6 +12,7 @@ go_library(
importpath = "github.com/OffchainLabs/prysm/v6/beacon-chain/rpc/core",
visibility = ["//visibility:public"],
deps = [
"//api/server:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/altair:go_default_library",

View File

@@ -7,6 +7,7 @@ import (
"sort"
"time"
"github.com/OffchainLabs/prysm/v6/api/server"
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/altair"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/epoch/precompute"
@@ -36,24 +37,6 @@ import (
var errOptimisticMode = errors.New("the node is currently optimistic and cannot serve validators")
// AggregateBroadcastFailedError represents an error scenario where
// broadcasting an aggregate selection proof failed.
type AggregateBroadcastFailedError struct {
err error
}
// NewAggregateBroadcastFailedError creates a new error instance.
func NewAggregateBroadcastFailedError(err error) AggregateBroadcastFailedError {
return AggregateBroadcastFailedError{
err: err,
}
}
// Error returns the underlying error message.
func (e *AggregateBroadcastFailedError) Error() string {
return fmt.Sprintf("could not broadcast signed aggregated attestation: %s", e.err.Error())
}
// ComputeValidatorPerformance reports the validator's latest balance along with other important metrics on
// rewards and penalties throughout its lifecycle in the beacon chain.
func (s *Service) ComputeValidatorPerformance(
@@ -360,7 +343,8 @@ func (s *Service) SubmitSignedContributionAndProof(
// Wait for p2p broadcast to complete and return the first error (if any)
err := errs.Wait()
if err != nil {
return &RpcError{Err: err, Reason: Internal}
log.WithError(err).Debug("Could not broadcast signed contribution and proof")
return &RpcError{Err: server.NewBroadcastFailedError("SignedContributionAndProof", err), Reason: Internal}
}
s.OperationNotifier.OperationFeed().Send(&feed.Event{
@@ -411,7 +395,8 @@ func (s *Service) SubmitSignedAggregateSelectionProof(
}
if err := s.Broadcaster.Broadcast(ctx, agg); err != nil {
return &RpcError{Err: &AggregateBroadcastFailedError{err: err}, Reason: Internal}
log.WithError(err).Debug("Could not broadcast signed aggregate att and proof")
return &RpcError{Err: server.NewBroadcastFailedError("SignedAggregateAttAndProof", err), Reason: Internal}
}
if logrus.GetLevel() >= logrus.DebugLevel {

View File

@@ -6,8 +6,6 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/OffchainLabs/prysm/v6/api"
@@ -31,6 +29,7 @@ import (
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const broadcastBLSChangesRateLimit = 128
@@ -200,22 +199,23 @@ func (s *Server) SubmitAttestations(w http.ResponseWriter, r *http.Request) {
return
}
if len(failedBroadcasts) > 0 {
httputil.HandleError(
w,
fmt.Sprintf("Attestations at index %s could not be broadcasted", strings.Join(failedBroadcasts, ", ")),
http.StatusInternalServerError,
)
return
}
if len(attFailures) > 0 {
failuresErr := &server.IndexedVerificationFailureError{
failuresErr := &server.IndexedErrorContainer{
Code: http.StatusBadRequest,
Message: "One or more attestations failed validation",
Message: server.ErrIndexedValidationFail,
Failures: attFailures,
}
httputil.WriteError(w, failuresErr)
return
}
if len(failedBroadcasts) > 0 {
failuresErr := &server.IndexedErrorContainer{
Code: http.StatusInternalServerError,
Message: server.ErrIndexedBroadcastFail,
Failures: failedBroadcasts,
}
httputil.WriteError(w, failuresErr)
return
}
}
@@ -247,8 +247,8 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) {
return
}
var attFailures []*server.IndexedVerificationFailure
var failedBroadcasts []string
var attFailures []*server.IndexedError
var failedBroadcasts []*server.IndexedError
if v >= version.Electra {
attFailures, failedBroadcasts, err = s.handleAttestationsElectra(ctx, req.Data)
@@ -260,29 +260,30 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) {
return
}
if len(failedBroadcasts) > 0 {
httputil.HandleError(
w,
fmt.Sprintf("Attestations at index %s could not be broadcasted", strings.Join(failedBroadcasts, ", ")),
http.StatusInternalServerError,
)
return
}
if len(attFailures) > 0 {
failuresErr := &server.IndexedVerificationFailureError{
failuresErr := &server.IndexedErrorContainer{
Code: http.StatusBadRequest,
Message: "One or more attestations failed validation",
Message: server.ErrIndexedValidationFail,
Failures: attFailures,
}
httputil.WriteError(w, failuresErr)
return
}
if len(failedBroadcasts) > 0 {
failuresErr := &server.IndexedErrorContainer{
Code: http.StatusInternalServerError,
Message: server.ErrIndexedBroadcastFail,
Failures: failedBroadcasts,
}
httputil.WriteError(w, failuresErr)
return
}
}
func (s *Server) handleAttestationsElectra(
ctx context.Context,
data json.RawMessage,
) (attFailures []*server.IndexedVerificationFailure, failedBroadcasts []string, err error) {
) (attFailures []*server.IndexedError, failedBroadcasts []*server.IndexedError, err error) {
var sourceAttestations []*structs.SingleAttestation
currentEpoch := slots.ToEpoch(s.TimeFetcher.CurrentSlot())
if currentEpoch < params.BeaconConfig().ElectraForkEpoch {
@@ -301,14 +302,14 @@ func (s *Server) handleAttestationsElectra(
for i, sourceAtt := range sourceAttestations {
att, err := sourceAtt.ToConsensus()
if err != nil {
attFailures = append(attFailures, &server.IndexedVerificationFailure{
attFailures = append(attFailures, &server.IndexedError{
Index: i,
Message: "Could not convert request attestation to consensus attestation: " + err.Error(),
})
continue
}
if _, err = bls.SignatureFromBytes(att.Signature); err != nil {
attFailures = append(attFailures, &server.IndexedVerificationFailure{
attFailures = append(attFailures, &server.IndexedError{
Index: i,
Message: "Incorrect attestation signature: " + err.Error(),
})
@@ -317,6 +318,13 @@ func (s *Server) handleAttestationsElectra(
validAttestations = append(validAttestations, att)
}
// We store the error for the first failed broadcast and use it in the log message in case
// there are broadcast issues. Having a single log at the end instead of logging
// for every failed broadcast prevents log noise in case there are many failures.
// Even though we only retain the first error, there is a very good chance that all
// broadcasts fail for the same reason, so this should be sufficient in most cases.
var broadcastErr error
for i, singleAtt := range validAttestations {
s.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.SingleAttReceived,
@@ -338,31 +346,45 @@ func (s *Server) handleAttestationsElectra(
wantedEpoch := slots.ToEpoch(att.Data.Slot)
vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
if err != nil {
failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i))
continue
return nil, nil, errors.Wrap(err, "could not get head validator indices")
}
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.GetCommitteeIndex(), att.Data.Slot)
if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, singleAtt); err != nil {
log.WithError(err).Errorf("could not broadcast attestation at index %d", i)
failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i))
failedBroadcasts = append(failedBroadcasts, &server.IndexedError{
Index: i,
Message: server.NewBroadcastFailedError("SingleAttestation", err).Error(),
})
if broadcastErr == nil {
broadcastErr = err
}
continue
}
if features.Get().EnableExperimentalAttestationPool {
if err = s.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("could not save attestation")
log.WithError(err).Error("Could not save attestation")
}
} else {
if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("could not save attestation")
log.WithError(err).Error("Could not save attestation")
}
}
}
if len(failedBroadcasts) > 0 {
log.WithFields(logrus.Fields{
"failedCount": len(failedBroadcasts),
"totalCount": len(validAttestations),
}).WithError(broadcastErr).Error("Some attestations failed to be broadcast")
}
return attFailures, failedBroadcasts, nil
}
func (s *Server) handleAttestations(ctx context.Context, data json.RawMessage) (attFailures []*server.IndexedVerificationFailure, failedBroadcasts []string, err error) {
func (s *Server) handleAttestations(
ctx context.Context,
data json.RawMessage,
) (attFailures []*server.IndexedError, failedBroadcasts []*server.IndexedError, err error) {
var sourceAttestations []*structs.Attestation
if slots.ToEpoch(s.TimeFetcher.CurrentSlot()) >= params.BeaconConfig().ElectraForkEpoch {
@@ -381,14 +403,14 @@ func (s *Server) handleAttestations(ctx context.Context, data json.RawMessage) (
for i, sourceAtt := range sourceAttestations {
att, err := sourceAtt.ToConsensus()
if err != nil {
attFailures = append(attFailures, &server.IndexedVerificationFailure{
attFailures = append(attFailures, &server.IndexedError{
Index: i,
Message: "Could not convert request attestation to consensus attestation: " + err.Error(),
})
continue
}
if _, err = bls.SignatureFromBytes(att.Signature); err != nil {
attFailures = append(attFailures, &server.IndexedVerificationFailure{
attFailures = append(attFailures, &server.IndexedError{
Index: i,
Message: "Incorrect attestation signature: " + err.Error(),
})
@@ -397,6 +419,13 @@ func (s *Server) handleAttestations(ctx context.Context, data json.RawMessage) (
validAttestations = append(validAttestations, att)
}
// We store the error for the first failed broadcast and use it in the log message in case
// there are broadcast issues. Having a single log at the end instead of logging
// for every failed broadcast prevents log noise in case there are many failures.
// Even though we only retain the first error, there is a very good chance that all
// broadcasts fail for the same reason, so this should be sufficient in most cases.
var broadcastErr error
for i, att := range validAttestations {
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
// of a received unaggregated attestation.
@@ -413,32 +442,43 @@ func (s *Server) handleAttestations(ctx context.Context, data json.RawMessage) (
wantedEpoch := slots.ToEpoch(att.Data.Slot)
vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
if err != nil {
failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i))
continue
return nil, nil, errors.Wrap(err, "could not get head validator indices")
}
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.Data.CommitteeIndex, att.Data.Slot)
if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, att); err != nil {
log.WithError(err).Errorf("could not broadcast attestation at index %d", i)
failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i))
failedBroadcasts = append(failedBroadcasts, &server.IndexedError{
Index: i,
Message: server.NewBroadcastFailedError("Attestation", err).Error(),
})
if broadcastErr == nil {
broadcastErr = err
}
continue
}
if features.Get().EnableExperimentalAttestationPool {
if err = s.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("could not save attestation")
log.WithError(err).Error("Could not save attestation")
}
} else if att.IsAggregated() {
if err = s.AttestationsPool.SaveAggregatedAttestation(att); err != nil {
log.WithError(err).Error("could not save aggregated attestation")
log.WithError(err).Error("Could not save aggregated attestation")
}
} else {
if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("could not save unaggregated attestation")
log.WithError(err).Error("Could not save unaggregated attestation")
}
}
}
if len(failedBroadcasts) > 0 {
log.WithFields(logrus.Fields{
"failedCount": len(failedBroadcasts),
"totalCount": len(validAttestations),
}).WithError(broadcastErr).Error("Some attestations failed to be broadcast")
}
return attFailures, failedBroadcasts, nil
}
@@ -541,11 +581,11 @@ func (s *Server) SubmitSyncCommitteeSignatures(w http.ResponseWriter, r *http.Re
}
var validMessages []*eth.SyncCommitteeMessage
var msgFailures []*server.IndexedVerificationFailure
var msgFailures []*server.IndexedError
for i, sourceMsg := range req.Data {
msg, err := sourceMsg.ToConsensus()
if err != nil {
msgFailures = append(msgFailures, &server.IndexedVerificationFailure{
msgFailures = append(msgFailures, &server.IndexedError{
Index: i,
Message: "Could not convert request message to consensus message: " + err.Error(),
})
@@ -562,7 +602,7 @@ func (s *Server) SubmitSyncCommitteeSignatures(w http.ResponseWriter, r *http.Re
}
if len(msgFailures) > 0 {
failuresErr := &server.IndexedVerificationFailureError{
failuresErr := &server.IndexedErrorContainer{
Code: http.StatusBadRequest,
Message: "One or more messages failed validation",
Failures: msgFailures,
@@ -581,7 +621,7 @@ func (s *Server) SubmitBLSToExecutionChanges(w http.ResponseWriter, r *http.Requ
httputil.HandleError(w, fmt.Sprintf("Could not get head state: %v", err), http.StatusInternalServerError)
return
}
var failures []*server.IndexedVerificationFailure
var failures []*server.IndexedError
var toBroadcast []*eth.SignedBLSToExecutionChange
var req []*structs.SignedBLSToExecutionChange
@@ -602,7 +642,7 @@ func (s *Server) SubmitBLSToExecutionChanges(w http.ResponseWriter, r *http.Requ
for i, change := range req {
sbls, err := change.ToConsensus()
if err != nil {
failures = append(failures, &server.IndexedVerificationFailure{
failures = append(failures, &server.IndexedError{
Index: i,
Message: "Unable to decode SignedBLSToExecutionChange: " + err.Error(),
})
@@ -610,14 +650,14 @@ func (s *Server) SubmitBLSToExecutionChanges(w http.ResponseWriter, r *http.Requ
}
_, err = blocks.ValidateBLSToExecutionChange(st, sbls)
if err != nil {
failures = append(failures, &server.IndexedVerificationFailure{
failures = append(failures, &server.IndexedError{
Index: i,
Message: "Could not validate SignedBLSToExecutionChange: " + err.Error(),
})
continue
}
if err := blocks.VerifyBLSChangeSignature(st, sbls); err != nil {
failures = append(failures, &server.IndexedVerificationFailure{
failures = append(failures, &server.IndexedError{
Index: i,
Message: "Could not validate signature: " + err.Error(),
})
@@ -636,9 +676,9 @@ func (s *Server) SubmitBLSToExecutionChanges(w http.ResponseWriter, r *http.Requ
}
go s.broadcastBLSChanges(context.Background(), toBroadcast)
if len(failures) > 0 {
failuresErr := &server.IndexedVerificationFailureError{
failuresErr := &server.IndexedErrorContainer{
Code: http.StatusBadRequest,
Message: "One or more BLSToExecutionChange failed validation",
Message: server.ErrIndexedValidationFail,
Failures: failures,
}
httputil.WriteError(w, failuresErr)
@@ -655,18 +695,18 @@ func (s *Server) broadcastBLSBatch(ctx context.Context, ptr *[]*eth.SignedBLSToE
}
st, err := s.ChainInfoFetcher.HeadStateReadOnly(ctx)
if err != nil {
log.WithError(err).Error("could not get head state")
log.WithError(err).Error("Could not get head state")
return
}
for _, ch := range (*ptr)[:limit] {
if ch != nil {
_, err := blocks.ValidateBLSToExecutionChange(st, ch)
if err != nil {
log.WithError(err).Error("could not validate BLS to execution change")
log.WithError(err).Error("Could not validate BLS to execution change")
continue
}
if err := s.Broadcaster.Broadcast(ctx, ch); err != nil {
log.WithError(err).Error("could not broadcast BLS to execution changes.")
log.WithError(err).Error("Could not broadcast BLS to execution changes.")
}
}
}

View File

@@ -638,7 +638,7 @@ func TestSubmitAttestations(t *testing.T) {
s.SubmitAttestations(writer, request)
assert.Equal(t, http.StatusBadRequest, writer.Code)
e := &server.IndexedVerificationFailureError{}
e := &server.IndexedErrorContainer{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e))
assert.Equal(t, http.StatusBadRequest, e.Code)
require.Equal(t, 1, len(e.Failures))
@@ -772,7 +772,7 @@ func TestSubmitAttestations(t *testing.T) {
s.SubmitAttestationsV2(writer, request)
assert.Equal(t, http.StatusBadRequest, writer.Code)
e := &server.IndexedVerificationFailureError{}
e := &server.IndexedErrorContainer{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e))
assert.Equal(t, http.StatusBadRequest, e.Code)
require.Equal(t, 1, len(e.Failures))
@@ -873,7 +873,7 @@ func TestSubmitAttestations(t *testing.T) {
s.SubmitAttestationsV2(writer, request)
assert.Equal(t, http.StatusBadRequest, writer.Code)
e := &server.IndexedVerificationFailureError{}
e := &server.IndexedErrorContainer{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e))
assert.Equal(t, http.StatusBadRequest, e.Code)
require.Equal(t, 1, len(e.Failures))
@@ -1538,7 +1538,7 @@ func TestSubmitSignedBLSToExecutionChanges_Failures(t *testing.T) {
s.SubmitBLSToExecutionChanges(writer, request)
assert.Equal(t, http.StatusBadRequest, writer.Code)
time.Sleep(10 * time.Millisecond) // Delay to allow the routine to start
require.StringContains(t, "One or more BLSToExecutionChange failed validation", writer.Body.String())
require.StringContains(t, "One or more messages failed validation", writer.Body.String())
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, numValidators, len(broadcaster.BroadcastMessages)+1)

View File

@@ -12,6 +12,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//api:go_default_library",
"//api/server:go_default_library",
"//api/server/structs:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/builder:go_default_library",

View File

@@ -14,6 +14,7 @@ import (
"time"
"github.com/OffchainLabs/prysm/v6/api"
"github.com/OffchainLabs/prysm/v6/api/server"
"github.com/OffchainLabs/prysm/v6/api/server/structs"
"github.com/OffchainLabs/prysm/v6/beacon-chain/builder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
@@ -268,22 +269,61 @@ func (s *Server) SubmitContributionAndProofs(w http.ResponseWriter, r *http.Requ
return
}
for _, item := range reqData {
var failures []*server.IndexedError
var failedBroadcasts []*server.IndexedError
for i, item := range reqData {
var contribution structs.SignedContributionAndProof
if err := json.Unmarshal(item, &contribution); err != nil {
httputil.HandleError(w, "Could not decode item: "+err.Error(), http.StatusBadRequest)
return
failures = append(failures, &server.IndexedError{
Index: i,
Message: "Could not unmarshal message: " + err.Error(),
})
continue
}
consensusItem, err := contribution.ToConsensus()
if err != nil {
httputil.HandleError(w, "Could not convert contribution to consensus format: "+err.Error(), http.StatusBadRequest)
return
failures = append(failures, &server.IndexedError{
Index: i,
Message: "Could not convert request contribution to consensus contribution: " + err.Error(),
})
continue
}
if rpcError := s.CoreService.SubmitSignedContributionAndProof(ctx, consensusItem); rpcError != nil {
httputil.HandleError(w, rpcError.Err.Error(), core.ErrorReasonToHTTP(rpcError.Reason))
return
rpcError := s.CoreService.SubmitSignedContributionAndProof(ctx, consensusItem)
if rpcError != nil {
var broadcastFailedErr *server.BroadcastFailedError
if errors.As(rpcError.Err, &broadcastFailedErr) {
failedBroadcasts = append(failedBroadcasts, &server.IndexedError{
Index: i,
Message: rpcError.Err.Error(),
})
continue
} else {
httputil.HandleError(w, rpcError.Err.Error(), core.ErrorReasonToHTTP(rpcError.Reason))
return
}
}
}
if len(failures) > 0 {
failuresErr := &server.IndexedErrorContainer{
Code: http.StatusBadRequest,
Message: server.ErrIndexedValidationFail,
Failures: failures,
}
httputil.WriteError(w, failuresErr)
return
}
if len(failedBroadcasts) > 0 {
failuresErr := &server.IndexedErrorContainer{
Code: http.StatusInternalServerError,
Message: server.ErrIndexedBroadcastFail,
Failures: failedBroadcasts,
}
httputil.WriteError(w, failuresErr)
return
}
}
// Deprecated: use SubmitAggregateAndProofsV2 instead
@@ -322,8 +362,8 @@ func (s *Server) SubmitAggregateAndProofs(w http.ResponseWriter, r *http.Request
}
rpcError := s.CoreService.SubmitSignedAggregateSelectionProof(ctx, consensusItem)
if rpcError != nil {
var aggregateBroadcastFailedError *core.AggregateBroadcastFailedError
ok := errors.As(rpcError.Err, &aggregateBroadcastFailedError)
var broadcastFailedErr *server.BroadcastFailedError
ok := errors.As(rpcError.Err, &broadcastFailedErr)
if ok {
broadcastFailed = true
} else {
@@ -368,49 +408,83 @@ func (s *Server) SubmitAggregateAndProofsV2(w http.ResponseWriter, r *http.Reque
return
}
broadcastFailed := false
var failures []*server.IndexedError
var failedBroadcasts []*server.IndexedError
var rpcError *core.RpcError
for _, raw := range reqData {
for i, raw := range reqData {
if v >= version.Electra {
var signedAggregate structs.SignedAggregateAttestationAndProofElectra
err = json.Unmarshal(raw, &signedAggregate)
if err != nil {
httputil.HandleError(w, "Failed to parse aggregate attestation and proof: "+err.Error(), http.StatusBadRequest)
return
failures = append(failures, &server.IndexedError{
Index: i,
Message: "Could not parse message: " + err.Error(),
})
continue
}
consensusItem, err := signedAggregate.ToConsensus()
if err != nil {
httputil.HandleError(w, "Could not convert request aggregate to consensus aggregate: "+err.Error(), http.StatusBadRequest)
return
failures = append(failures, &server.IndexedError{
Index: i,
Message: "Could not convert request aggregate to consensus aggregate: " + err.Error(),
})
continue
}
rpcError = s.CoreService.SubmitSignedAggregateSelectionProof(ctx, consensusItem)
} else {
var signedAggregate structs.SignedAggregateAttestationAndProof
err = json.Unmarshal(raw, &signedAggregate)
if err != nil {
httputil.HandleError(w, "Failed to parse aggregate attestation and proof: "+err.Error(), http.StatusBadRequest)
return
failures = append(failures, &server.IndexedError{
Index: i,
Message: "Could not parse message: " + err.Error(),
})
continue
}
consensusItem, err := signedAggregate.ToConsensus()
if err != nil {
httputil.HandleError(w, "Could not convert request aggregate to consensus aggregate: "+err.Error(), http.StatusBadRequest)
return
failures = append(failures, &server.IndexedError{
Index: i,
Message: "Could not convert request aggregate to consensus aggregate: " + err.Error(),
})
continue
}
rpcError = s.CoreService.SubmitSignedAggregateSelectionProof(ctx, consensusItem)
}
if rpcError != nil {
var aggregateBroadcastFailedError *core.AggregateBroadcastFailedError
if errors.As(rpcError.Err, &aggregateBroadcastFailedError) {
broadcastFailed = true
var broadcastFailedErr *server.BroadcastFailedError
if errors.As(rpcError.Err, &broadcastFailedErr) {
failedBroadcasts = append(failedBroadcasts, &server.IndexedError{
Index: i,
Message: rpcError.Err.Error(),
})
continue
} else {
httputil.HandleError(w, rpcError.Err.Error(), core.ErrorReasonToHTTP(rpcError.Reason))
return
}
}
}
if broadcastFailed {
httputil.HandleError(w, "Could not broadcast one or more signed aggregated attestations", http.StatusInternalServerError)
if len(failures) > 0 {
failuresErr := &server.IndexedErrorContainer{
Code: http.StatusBadRequest,
Message: server.ErrIndexedValidationFail,
Failures: failures,
}
httputil.WriteError(w, failuresErr)
return
}
if len(failedBroadcasts) > 0 {
failuresErr := &server.IndexedErrorContainer{
Code: http.StatusInternalServerError,
Message: server.ErrIndexedBroadcastFail,
Failures: failedBroadcasts,
}
httputil.WriteError(w, failuresErr)
return
}
}

View File

@@ -283,11 +283,6 @@ func (p *BeaconDbBlocker) Blobs(ctx context.Context, id string, opts ...options.
return make([]*blocks.VerifiedROBlob, 0), nil
}
fuluForkSlot, err := slots.EpochStart(params.BeaconConfig().FuluForkEpoch)
if err != nil {
return nil, &core.RpcError{Err: errors.Wrap(err, "could not calculate Fulu start slot"), Reason: core.Internal}
}
// Convert versioned hashes to indices if provided
indices := cfg.Indices
if len(cfg.VersionedHashes) > 0 {
@@ -328,7 +323,7 @@ func (p *BeaconDbBlocker) Blobs(ctx context.Context, id string, opts ...options.
}
}
if roBlock.Slot() >= fuluForkSlot {
if roBlock.Slot() >= params.SlotOrFarFuture(params.BeaconConfig().FuluForkEpoch) {
roBlock, err := blocks.NewROBlockWithRoot(roSignedBlock, root)
if err != nil {
return nil, &core.RpcError{Err: errors.Wrapf(err, "failed to create roBlock with root %#x", root), Reason: core.Internal}

View File

@@ -587,6 +587,51 @@ func TestGetBlob(t *testing.T) {
require.Equal(t, http.StatusBadRequest, core.ErrorReasonToHTTP(rpcErr.Reason))
require.StringContains(t, "not supported before", rpcErr.Err.Error())
})
t.Run("fulu fork epoch not set (MaxUint64)", func(t *testing.T) {
// Setup with Deneb fork enabled but Fulu fork epoch set to MaxUint64 (not set/far future)
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig().Copy()
cfg.DenebForkEpoch = 1
cfg.FuluForkEpoch = primitives.Epoch(math.MaxUint64) // Not set / far future
params.OverrideBeaconConfig(cfg)
// Create and save Deneb block and blob sidecars
denebSlot := util.SlotAtEpoch(t, cfg.DenebForkEpoch)
_, tempBlobStorage := filesystem.NewEphemeralBlobStorageAndFs(t)
denebBlockWithBlobs, denebBlobSidecars := util.GenerateTestDenebBlockWithSidecar(t, [fieldparams.RootLength]byte{}, denebSlot, 2, util.WithDenebSlot(denebSlot))
denebBlockRoot := denebBlockWithBlobs.Root()
verifiedDenebBlobs := verification.FakeVerifySliceForTest(t, denebBlobSidecars)
for i := range verifiedDenebBlobs {
err := tempBlobStorage.Save(verifiedDenebBlobs[i])
require.NoError(t, err)
}
err := db.SaveBlock(t.Context(), denebBlockWithBlobs)
require.NoError(t, err)
blocker := &BeaconDbBlocker{
GenesisTimeFetcher: &testutil.MockGenesisTimeFetcher{
Genesis: time.Now(),
},
BeaconDB: db,
BlobStorage: tempBlobStorage,
}
// Should successfully retrieve blobs even when FuluForkEpoch is not set
retrievedBlobs, rpcErr := blocker.Blobs(ctx, hexutil.Encode(denebBlockRoot[:]))
require.IsNil(t, rpcErr)
require.Equal(t, 2, len(retrievedBlobs))
// Verify blob content matches
for i, retrievedBlob := range retrievedBlobs {
require.NotNil(t, retrievedBlob.BlobSidecar)
require.DeepEqual(t, denebBlobSidecars[i].Blob, retrievedBlob.Blob)
require.DeepEqual(t, denebBlobSidecars[i].KzgCommitment, retrievedBlob.KzgCommitment)
}
})
}
func TestBlobs_CommitmentOrdering(t *testing.T) {

View File

@@ -0,0 +1,2 @@
### Fixed
- `HasAtLeastOneIndex`: Check the index is not too high.

View File

@@ -0,0 +1,3 @@
### Ignored
- Remove redundant check for genesis root at startup.

View File

@@ -0,0 +1,3 @@
### Changed
- Improve returning individual message errors from Beacon API.

View File

@@ -0,0 +1,3 @@
### Fixed
- Display error messages from the server verbatim when they are not encoded as `application/json`.

View File

@@ -219,8 +219,8 @@ func TestCallWithdrawalEndpoint_Errors(t *testing.T) {
if r.Method == http.MethodPost && r.RequestURI == "/eth/v1/beacon/pool/bls_to_execution_changes" {
w.WriteHeader(400)
w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(&server.IndexedVerificationFailureError{
Failures: []*server.IndexedVerificationFailure{
err = json.NewEncoder(w).Encode(&server.IndexedErrorContainer{
Failures: []*server.IndexedError{
{Index: 0, Message: "Could not validate SignedBLSToExecutionChange"},
},
})

View File

@@ -22,6 +22,8 @@ import (
"github.com/pkg/errors"
)
const MaxSlot = primitives.Slot(math.MaxUint64)
// BeaconChainConfig contains constant configs for node to participate in beacon chain.
type BeaconChainConfig struct {
// Constants (non-configurable)
@@ -322,6 +324,16 @@ type BeaconChainConfig struct {
forkSchedule *NetworkSchedule
bpoSchedule *NetworkSchedule
networkSchedule *NetworkSchedule
maxe primitives.Epoch
}
// MaxEpoch computes the highest epoch value that can be used without overflowing in the conversion to slots.
func (b *BeaconChainConfig) MaxEpoch() primitives.Epoch {
if b.maxe == 0 {
b.maxe = primitives.Epoch(MaxSlot / b.SlotsPerEpoch)
}
return b.maxe
}
func (b *BeaconChainConfig) VersionToForkEpochMap() map[int]primitives.Epoch {

View File

@@ -111,3 +111,13 @@ func genesisNetworkScheduleEntry() NetworkScheduleEntry {
// a properly initialized fork schedule.
return NetworkScheduleEntry{Epoch: b.GenesisEpoch, isFork: true, ForkVersion: to4(b.GenesisForkVersion), VersionEnum: version.Phase0}
}
// SlotOrFarFuture either returns the slot for the given epoch
// or the maximum slot value if the conversion to slot will overflow.
func SlotOrFarFuture(e primitives.Epoch) primitives.Slot {
cfg := BeaconConfig()
if e > cfg.MaxEpoch() {
return MaxSlot
}
return primitives.Slot(e) * cfg.SlotsPerEpoch
}

View File

@@ -135,7 +135,7 @@ func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]b
decoder := json.NewDecoder(bytes.NewBuffer(body))
errorJson := &httputil.DefaultJsonError{}
if err = decoder.Decode(errorJson); err != nil {
return nil, nil, errors.Wrapf(err, "failed to decode response body into error json for %s", httpResp.Request.URL)
return nil, nil, fmt.Errorf("HTTP request for %s unsuccessful (%d: %s)", httpResp.Request.URL, httpResp.StatusCode, string(body))
}
return nil, nil, errorJson
}
@@ -241,7 +241,7 @@ func (c *BeaconApiRestHandler) PostSSZ(
decoder := json.NewDecoder(bytes.NewBuffer(body))
errorJson := &httputil.DefaultJsonError{}
if err = decoder.Decode(errorJson); err != nil {
return nil, nil, errors.Wrapf(err, "failed to decode response body into error json for %s", httpResp.Request.URL)
return nil, nil, fmt.Errorf("HTTP request for %s unsuccessful (%d: %s)", httpResp.Request.URL, httpResp.StatusCode, string(body))
}
return nil, nil, errorJson
}

View File

@@ -344,4 +344,18 @@ func Test_decodeResp(t *testing.T) {
err = decodeResp(r, nil)
assert.ErrorContains(t, "failed to decode response body into error json", err)
})
t.Run("500 not JSON", func(t *testing.T) {
body := bytes.Buffer{}
_, err := body.WriteString("foo")
require.NoError(t, err)
r := &http.Response{
Status: "500",
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {"text/plain"}},
Request: &http.Request{},
}
err = decodeResp(r, nil)
assert.ErrorContains(t, "HTTP request unsuccessful (500: foo)", err)
})
}