Compare commits

..

1 Commits

Author SHA1 Message Date
Manu NALEPA
496ddb8015 subscribe-all-attestations-and-sync-subnets 2025-10-13 15:03:32 +02:00
66 changed files with 354 additions and 944 deletions

View File

@@ -284,7 +284,7 @@ func (c *Client) SubmitChangeBLStoExecution(ctx context.Context, request []*stru
if resp.StatusCode != http.StatusOK {
decoder := json.NewDecoder(resp.Body)
decoder.DisallowUnknownFields()
errorJson := &server.IndexedErrorContainer{}
errorJson := &server.IndexedVerificationFailureError{}
if err := decoder.Decode(errorJson); err != nil {
return errors.Wrapf(err, "failed to decode error JSON for %s", resp.Request.URL)
}

View File

@@ -726,12 +726,6 @@ func unexpectedStatusErr(response *http.Response, expected int) error {
return errors.Wrap(jsonErr, "unable to read response body")
}
return errors.Wrap(ErrNotOK, errMessage.Message)
case http.StatusBadGateway:
log.WithError(ErrBadGateway).Debug(msg)
if jsonErr := json.Unmarshal(bodyBytes, &errMessage); jsonErr != nil {
return errors.Wrap(jsonErr, "unable to read response body")
}
return errors.Wrap(ErrBadGateway, errMessage.Message)
default:
log.WithError(ErrNotOK).Debug(msg)
return errors.Wrap(ErrNotOK, fmt.Sprintf("unsupported error code: %d", response.StatusCode))

View File

@@ -12,6 +12,7 @@ import (
"github.com/OffchainLabs/prysm/v6/api"
"github.com/OffchainLabs/prysm/v6/api/server/structs"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
@@ -21,7 +22,6 @@ import (
eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/prysmaticlabs/go-bitfield"
log "github.com/sirupsen/logrus"
)

View File

@@ -21,4 +21,3 @@ var ErrUnsupportedMediaType = errors.Wrap(ErrNotOK, "The media type in \"Content
// ErrNotAcceptable specifically means that a '406 - Not Acceptable' was received from the API.
var ErrNotAcceptable = errors.Wrap(ErrNotOK, "The accept header value is not acceptable")
var ErrBadGateway = errors.Wrap(ErrNotOK, "recv 502 BadGateway response from API")

View File

@@ -6,11 +6,6 @@ import (
"strings"
)
var (
ErrIndexedValidationFail = "One or more messages failed validation"
ErrIndexedBroadcastFail = "One or more messages failed broadcast"
)
// DecodeError represents an error resulting from trying to decode an HTTP request.
// It tracks the full field name for which decoding failed.
type DecodeError struct {
@@ -34,38 +29,19 @@ func (e *DecodeError) Error() string {
return fmt.Sprintf("could not decode %s: %s", strings.Join(e.path, "."), e.err.Error())
}
// IndexedErrorContainer wraps a collection of indexed errors.
type IndexedErrorContainer struct {
Message string `json:"message"`
Code int `json:"code"`
Failures []*IndexedError `json:"failures"`
// IndexedVerificationFailureError wraps a collection of verification failures.
type IndexedVerificationFailureError struct {
Message string `json:"message"`
Code int `json:"code"`
Failures []*IndexedVerificationFailure `json:"failures"`
}
func (e *IndexedErrorContainer) StatusCode() int {
func (e *IndexedVerificationFailureError) StatusCode() int {
return e.Code
}
// IndexedError represents an issue when processing a single indexed object e.g. an item in an array.
type IndexedError struct {
// IndexedVerificationFailure represents an issue when verifying a single indexed object e.g. an item in an array.
type IndexedVerificationFailure struct {
Index int `json:"index"`
Message string `json:"message"`
}
// BroadcastFailedError represents an error scenario where broadcasting a published message failed.
type BroadcastFailedError struct {
msg string
err error
}
// NewBroadcastFailedError creates a new instance of BroadcastFailedError.
func NewBroadcastFailedError(msg string, err error) *BroadcastFailedError {
return &BroadcastFailedError{
msg: msg,
err: err,
}
}
// Error returns the underlying error message.
func (e *BroadcastFailedError) Error() string {
return fmt.Sprintf("could not broadcast %s: %s", e.msg, e.err.Error())
}

View File

@@ -3302,6 +3302,7 @@ func Test_postBlockProcess_EventSending(t *testing.T) {
}
}
func setupLightClientTestRequirements(ctx context.Context, t *testing.T, s *Service, v int, options ...util.LightClientOption) (*util.TestLightClient, *postBlockProcessConfig) {
var l *util.TestLightClient
switch v {

View File

@@ -6,4 +6,3 @@ var errNilSignedWithdrawalMessage = errors.New("nil SignedBLSToExecutionChange m
var errNilWithdrawalMessage = errors.New("nil BLSToExecutionChange message")
var errInvalidBLSPrefix = errors.New("withdrawal credential prefix is not a BLS prefix")
var errInvalidWithdrawalCredentials = errors.New("withdrawal credentials do not match")
var ErrInvalidSignature = errors.New("invalid signature")

View File

@@ -114,12 +114,9 @@ func VerifyBlockSignatureUsingCurrentFork(beaconState state.ReadOnlyBeaconState,
}
proposerPubKey := proposer.PublicKey
sig := blk.Signature()
if err := signing.VerifyBlockSigningRoot(proposerPubKey, sig[:], domain, func() ([32]byte, error) {
return signing.VerifyBlockSigningRoot(proposerPubKey, sig[:], domain, func() ([32]byte, error) {
return blkRoot, nil
}); err != nil {
return ErrInvalidSignature
}
return nil
})
}
// BlockSignatureBatch retrieves the block signature batch from the provided block and its corresponding state.

View File

@@ -89,36 +89,3 @@ func TestVerifyBlockSignatureUsingCurrentFork(t *testing.T) {
require.NoError(t, err)
assert.NoError(t, blocks.VerifyBlockSignatureUsingCurrentFork(bState, wsb, blkRoot))
}
func TestVerifyBlockSignatureUsingCurrentFork_InvalidSignature(t *testing.T) {
params.SetupTestConfigCleanup(t)
bCfg := params.BeaconConfig()
bCfg.AltairForkEpoch = 100
bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.AltairForkVersion)] = 100
params.OverrideBeaconConfig(bCfg)
bState, keys := util.DeterministicGenesisState(t, 100)
altairBlk := util.NewBeaconBlockAltair()
altairBlk.Block.ProposerIndex = 0
altairBlk.Block.Slot = params.BeaconConfig().SlotsPerEpoch * 100
blkRoot, err := altairBlk.Block.HashTreeRoot()
assert.NoError(t, err)
// Sign with wrong key (proposer index 0, but using key 1)
fData := &ethpb.Fork{
Epoch: 100,
CurrentVersion: params.BeaconConfig().AltairForkVersion,
PreviousVersion: params.BeaconConfig().GenesisForkVersion,
}
domain, err := signing.Domain(fData, 100, params.BeaconConfig().DomainBeaconProposer, bState.GenesisValidatorsRoot())
assert.NoError(t, err)
rt, err := signing.ComputeSigningRoot(altairBlk.Block, domain)
assert.NoError(t, err)
wrongSig := keys[1].Sign(rt[:]).Marshal()
altairBlk.Signature = wrongSig
wsb, err := consensusblocks.NewSignedBeaconBlock(altairBlk)
require.NoError(t, err)
err = blocks.VerifyBlockSignatureUsingCurrentFork(bState, wsb, blkRoot)
require.ErrorIs(t, err, blocks.ErrInvalidSignature, "Expected ErrInvalidSignature for invalid signature")
}

View File

@@ -257,7 +257,7 @@ func ComputeCellsAndProofsFromStructured(blobsAndProofs []*pb.BlobAndProofV2) ([
return nil, errors.Wrap(err, "compute cells")
}
kzgProofs := make([]kzg.Proof, 0, numberOfColumns)
kzgProofs := make([]kzg.Proof, 0, numberOfColumns*kzg.BytesPerProof)
for _, kzgProofBytes := range blobAndProof.KzgProofs {
if len(kzgProofBytes) != kzg.BytesPerProof {
return nil, errors.New("wrong KZG proof size - should never happen")

View File

@@ -441,7 +441,6 @@ func TestComputeCellsAndProofsFromStructured(t *testing.T) {
for i := range blobCount {
require.Equal(t, len(expectedCellsAndProofs[i].Cells), len(actualCellsAndProofs[i].Cells))
require.Equal(t, len(expectedCellsAndProofs[i].Proofs), len(actualCellsAndProofs[i].Proofs))
require.Equal(t, len(expectedCellsAndProofs[i].Proofs), cap(actualCellsAndProofs[i].Proofs))
// Compare cells
for j, expectedCell := range expectedCellsAndProofs[i].Cells {

View File

@@ -35,9 +35,8 @@ func (s DataColumnStorageSummary) HasIndex(index uint64) bool {
// HasAtLeastOneIndex returns true if at least one of the DataColumnSidecars at the given indices is available in the filesystem.
func (s DataColumnStorageSummary) HasAtLeastOneIndex(indices []uint64) bool {
size := uint64(len(s.mask))
for _, index := range indices {
if index < size && s.mask[index] {
if s.mask[index] {
return true
}
}

View File

@@ -25,11 +25,11 @@ func TestHasIndex(t *testing.T) {
func TestHasAtLeastOneIndex(t *testing.T) {
summary := NewDataColumnStorageSummary(0, [fieldparams.NumberOfColumns]bool{false, true})
actual := summary.HasAtLeastOneIndex([]uint64{3, 1, fieldparams.NumberOfColumns, 2})
require.Equal(t, true, actual)
hasAtLeastOneIndex := summary.HasAtLeastOneIndex([]uint64{3, 1, 2})
require.Equal(t, true, hasAtLeastOneIndex)
actual = summary.HasAtLeastOneIndex([]uint64{3, 4, fieldparams.NumberOfColumns, 2})
require.Equal(t, false, actual)
hasAtLeastOneIndex = summary.HasAtLeastOneIndex([]uint64{3, 4, 2})
require.Equal(t, false, hasAtLeastOneIndex)
}
func TestCount(t *testing.T) {

View File

@@ -122,7 +122,6 @@ type Reconstructor interface {
) ([]interfaces.SignedBeaconBlock, error)
ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [fieldparams.RootLength]byte, hi func(uint64) bool) ([]blocks.VerifiedROBlob, error)
ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error)
GetBlobsV2(ctx context.Context, versionedHashes []common.Hash) ([]*pb.BlobAndProofV2, error)
}
// EngineCaller defines a client that can interact with an Ethereum

View File

@@ -58,6 +58,7 @@ go_library(
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//container/slice:go_default_library",
"//encoding/bytesutil:go_default_library",
"//genesis:go_default_library",
"//monitoring/prometheus:go_default_library",
"//monitoring/tracing:go_default_library",

View File

@@ -60,6 +60,7 @@ import (
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/container/slice"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v6/genesis"
"github.com/OffchainLabs/prysm/v6/monitoring/prometheus"
"github.com/OffchainLabs/prysm/v6/runtime"
@@ -597,7 +598,22 @@ func (b *BeaconNode) startStateGen(ctx context.Context, bfs coverage.AvailableBl
return err
}
b.finalizedStateAtStartUp, err = sg.StateByRoot(ctx, [32]byte(cp.Root))
r := bytesutil.ToBytes32(cp.Root)
// Consider edge case where finalized root are zeros instead of genesis root hash.
if r == params.BeaconConfig().ZeroHash {
genesisBlock, err := b.db.GenesisBlock(ctx)
if err != nil {
return err
}
if genesisBlock != nil && !genesisBlock.IsNil() {
r, err = genesisBlock.Block().HashTreeRoot()
if err != nil {
return err
}
}
}
b.finalizedStateAtStartUp, err = sg.StateByRoot(ctx, r)
if err != nil {
return err
}
@@ -828,7 +844,6 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil
regularsync.WithSlasherEnabled(b.slasherEnabled),
regularsync.WithLightClientStore(b.lcStore),
regularsync.WithBatchVerifierLimit(b.cliCtx.Int(flags.BatchVerifierLimit.Name)),
regularsync.WithBlobToAddress(b.cliCtx.String(flags.BlobToAddressFlag.Name)),
)
return b.services.RegisterService(rs)
}

View File

@@ -12,7 +12,6 @@ go_library(
importpath = "github.com/OffchainLabs/prysm/v6/beacon-chain/rpc/core",
visibility = ["//visibility:public"],
deps = [
"//api/server:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/altair:go_default_library",

View File

@@ -7,7 +7,6 @@ import (
"sort"
"time"
"github.com/OffchainLabs/prysm/v6/api/server"
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/altair"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/epoch/precompute"
@@ -37,6 +36,24 @@ import (
var errOptimisticMode = errors.New("the node is currently optimistic and cannot serve validators")
// AggregateBroadcastFailedError represents an error scenario where
// broadcasting an aggregate selection proof failed.
type AggregateBroadcastFailedError struct {
err error
}
// NewAggregateBroadcastFailedError creates a new error instance.
func NewAggregateBroadcastFailedError(err error) AggregateBroadcastFailedError {
return AggregateBroadcastFailedError{
err: err,
}
}
// Error returns the underlying error message.
func (e *AggregateBroadcastFailedError) Error() string {
return fmt.Sprintf("could not broadcast signed aggregated attestation: %s", e.err.Error())
}
// ComputeValidatorPerformance reports the validator's latest balance along with other important metrics on
// rewards and penalties throughout its lifecycle in the beacon chain.
func (s *Service) ComputeValidatorPerformance(
@@ -343,8 +360,7 @@ func (s *Service) SubmitSignedContributionAndProof(
// Wait for p2p broadcast to complete and return the first error (if any)
err := errs.Wait()
if err != nil {
log.WithError(err).Debug("Could not broadcast signed contribution and proof")
return &RpcError{Err: server.NewBroadcastFailedError("SignedContributionAndProof", err), Reason: Internal}
return &RpcError{Err: err, Reason: Internal}
}
s.OperationNotifier.OperationFeed().Send(&feed.Event{
@@ -395,8 +411,7 @@ func (s *Service) SubmitSignedAggregateSelectionProof(
}
if err := s.Broadcaster.Broadcast(ctx, agg); err != nil {
log.WithError(err).Debug("Could not broadcast signed aggregate att and proof")
return &RpcError{Err: server.NewBroadcastFailedError("SignedAggregateAttAndProof", err), Reason: Internal}
return &RpcError{Err: &AggregateBroadcastFailedError{err: err}, Reason: Internal}
}
if logrus.GetLevel() >= logrus.DebugLevel {

View File

@@ -6,6 +6,8 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/OffchainLabs/prysm/v6/api"
@@ -29,7 +31,6 @@ import (
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const broadcastBLSChangesRateLimit = 128
@@ -199,23 +200,22 @@ func (s *Server) SubmitAttestations(w http.ResponseWriter, r *http.Request) {
return
}
if len(failedBroadcasts) > 0 {
httputil.HandleError(
w,
fmt.Sprintf("Attestations at index %s could not be broadcasted", strings.Join(failedBroadcasts, ", ")),
http.StatusInternalServerError,
)
return
}
if len(attFailures) > 0 {
failuresErr := &server.IndexedErrorContainer{
failuresErr := &server.IndexedVerificationFailureError{
Code: http.StatusBadRequest,
Message: server.ErrIndexedValidationFail,
Message: "One or more attestations failed validation",
Failures: attFailures,
}
httputil.WriteError(w, failuresErr)
return
}
if len(failedBroadcasts) > 0 {
failuresErr := &server.IndexedErrorContainer{
Code: http.StatusInternalServerError,
Message: server.ErrIndexedBroadcastFail,
Failures: failedBroadcasts,
}
httputil.WriteError(w, failuresErr)
return
}
}
@@ -247,8 +247,8 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) {
return
}
var attFailures []*server.IndexedError
var failedBroadcasts []*server.IndexedError
var attFailures []*server.IndexedVerificationFailure
var failedBroadcasts []string
if v >= version.Electra {
attFailures, failedBroadcasts, err = s.handleAttestationsElectra(ctx, req.Data)
@@ -260,30 +260,29 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) {
return
}
if len(failedBroadcasts) > 0 {
httputil.HandleError(
w,
fmt.Sprintf("Attestations at index %s could not be broadcasted", strings.Join(failedBroadcasts, ", ")),
http.StatusInternalServerError,
)
return
}
if len(attFailures) > 0 {
failuresErr := &server.IndexedErrorContainer{
failuresErr := &server.IndexedVerificationFailureError{
Code: http.StatusBadRequest,
Message: server.ErrIndexedValidationFail,
Message: "One or more attestations failed validation",
Failures: attFailures,
}
httputil.WriteError(w, failuresErr)
return
}
if len(failedBroadcasts) > 0 {
failuresErr := &server.IndexedErrorContainer{
Code: http.StatusInternalServerError,
Message: server.ErrIndexedBroadcastFail,
Failures: failedBroadcasts,
}
httputil.WriteError(w, failuresErr)
return
}
}
func (s *Server) handleAttestationsElectra(
ctx context.Context,
data json.RawMessage,
) (attFailures []*server.IndexedError, failedBroadcasts []*server.IndexedError, err error) {
) (attFailures []*server.IndexedVerificationFailure, failedBroadcasts []string, err error) {
var sourceAttestations []*structs.SingleAttestation
currentEpoch := slots.ToEpoch(s.TimeFetcher.CurrentSlot())
if currentEpoch < params.BeaconConfig().ElectraForkEpoch {
@@ -302,14 +301,14 @@ func (s *Server) handleAttestationsElectra(
for i, sourceAtt := range sourceAttestations {
att, err := sourceAtt.ToConsensus()
if err != nil {
attFailures = append(attFailures, &server.IndexedError{
attFailures = append(attFailures, &server.IndexedVerificationFailure{
Index: i,
Message: "Could not convert request attestation to consensus attestation: " + err.Error(),
})
continue
}
if _, err = bls.SignatureFromBytes(att.Signature); err != nil {
attFailures = append(attFailures, &server.IndexedError{
attFailures = append(attFailures, &server.IndexedVerificationFailure{
Index: i,
Message: "Incorrect attestation signature: " + err.Error(),
})
@@ -318,13 +317,6 @@ func (s *Server) handleAttestationsElectra(
validAttestations = append(validAttestations, att)
}
// We store the error for the first failed broadcast and use it in the log message in case
// there are broadcast issues. Having a single log at the end instead of logging
// for every failed broadcast prevents log noise in case there are many failures.
// Even though we only retain the first error, there is a very good chance that all
// broadcasts fail for the same reason, so this should be sufficient in most cases.
var broadcastErr error
for i, singleAtt := range validAttestations {
s.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.SingleAttReceived,
@@ -346,45 +338,31 @@ func (s *Server) handleAttestationsElectra(
wantedEpoch := slots.ToEpoch(att.Data.Slot)
vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
if err != nil {
return nil, nil, errors.Wrap(err, "could not get head validator indices")
failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i))
continue
}
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.GetCommitteeIndex(), att.Data.Slot)
if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, singleAtt); err != nil {
failedBroadcasts = append(failedBroadcasts, &server.IndexedError{
Index: i,
Message: server.NewBroadcastFailedError("SingleAttestation", err).Error(),
})
if broadcastErr == nil {
broadcastErr = err
}
log.WithError(err).Errorf("could not broadcast attestation at index %d", i)
failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i))
continue
}
if features.Get().EnableExperimentalAttestationPool {
if err = s.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("Could not save attestation")
log.WithError(err).Error("could not save attestation")
}
} else {
if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not save attestation")
log.WithError(err).Error("could not save attestation")
}
}
}
if len(failedBroadcasts) > 0 {
log.WithFields(logrus.Fields{
"failedCount": len(failedBroadcasts),
"totalCount": len(validAttestations),
}).WithError(broadcastErr).Error("Some attestations failed to be broadcast")
}
return attFailures, failedBroadcasts, nil
}
func (s *Server) handleAttestations(
ctx context.Context,
data json.RawMessage,
) (attFailures []*server.IndexedError, failedBroadcasts []*server.IndexedError, err error) {
func (s *Server) handleAttestations(ctx context.Context, data json.RawMessage) (attFailures []*server.IndexedVerificationFailure, failedBroadcasts []string, err error) {
var sourceAttestations []*structs.Attestation
if slots.ToEpoch(s.TimeFetcher.CurrentSlot()) >= params.BeaconConfig().ElectraForkEpoch {
@@ -403,14 +381,14 @@ func (s *Server) handleAttestations(
for i, sourceAtt := range sourceAttestations {
att, err := sourceAtt.ToConsensus()
if err != nil {
attFailures = append(attFailures, &server.IndexedError{
attFailures = append(attFailures, &server.IndexedVerificationFailure{
Index: i,
Message: "Could not convert request attestation to consensus attestation: " + err.Error(),
})
continue
}
if _, err = bls.SignatureFromBytes(att.Signature); err != nil {
attFailures = append(attFailures, &server.IndexedError{
attFailures = append(attFailures, &server.IndexedVerificationFailure{
Index: i,
Message: "Incorrect attestation signature: " + err.Error(),
})
@@ -419,13 +397,6 @@ func (s *Server) handleAttestations(
validAttestations = append(validAttestations, att)
}
// We store the error for the first failed broadcast and use it in the log message in case
// there are broadcast issues. Having a single log at the end instead of logging
// for every failed broadcast prevents log noise in case there are many failures.
// Even though we only retain the first error, there is a very good chance that all
// broadcasts fail for the same reason, so this should be sufficient in most cases.
var broadcastErr error
for i, att := range validAttestations {
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
// of a received unaggregated attestation.
@@ -442,43 +413,32 @@ func (s *Server) handleAttestations(
wantedEpoch := slots.ToEpoch(att.Data.Slot)
vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
if err != nil {
return nil, nil, errors.Wrap(err, "could not get head validator indices")
failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i))
continue
}
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.Data.CommitteeIndex, att.Data.Slot)
if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, att); err != nil {
failedBroadcasts = append(failedBroadcasts, &server.IndexedError{
Index: i,
Message: server.NewBroadcastFailedError("Attestation", err).Error(),
})
if broadcastErr == nil {
broadcastErr = err
}
log.WithError(err).Errorf("could not broadcast attestation at index %d", i)
failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i))
continue
}
if features.Get().EnableExperimentalAttestationPool {
if err = s.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("Could not save attestation")
log.WithError(err).Error("could not save attestation")
}
} else if att.IsAggregated() {
if err = s.AttestationsPool.SaveAggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not save aggregated attestation")
log.WithError(err).Error("could not save aggregated attestation")
}
} else {
if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not save unaggregated attestation")
log.WithError(err).Error("could not save unaggregated attestation")
}
}
}
if len(failedBroadcasts) > 0 {
log.WithFields(logrus.Fields{
"failedCount": len(failedBroadcasts),
"totalCount": len(validAttestations),
}).WithError(broadcastErr).Error("Some attestations failed to be broadcast")
}
return attFailures, failedBroadcasts, nil
}
@@ -581,11 +541,11 @@ func (s *Server) SubmitSyncCommitteeSignatures(w http.ResponseWriter, r *http.Re
}
var validMessages []*eth.SyncCommitteeMessage
var msgFailures []*server.IndexedError
var msgFailures []*server.IndexedVerificationFailure
for i, sourceMsg := range req.Data {
msg, err := sourceMsg.ToConsensus()
if err != nil {
msgFailures = append(msgFailures, &server.IndexedError{
msgFailures = append(msgFailures, &server.IndexedVerificationFailure{
Index: i,
Message: "Could not convert request message to consensus message: " + err.Error(),
})
@@ -602,7 +562,7 @@ func (s *Server) SubmitSyncCommitteeSignatures(w http.ResponseWriter, r *http.Re
}
if len(msgFailures) > 0 {
failuresErr := &server.IndexedErrorContainer{
failuresErr := &server.IndexedVerificationFailureError{
Code: http.StatusBadRequest,
Message: "One or more messages failed validation",
Failures: msgFailures,
@@ -621,7 +581,7 @@ func (s *Server) SubmitBLSToExecutionChanges(w http.ResponseWriter, r *http.Requ
httputil.HandleError(w, fmt.Sprintf("Could not get head state: %v", err), http.StatusInternalServerError)
return
}
var failures []*server.IndexedError
var failures []*server.IndexedVerificationFailure
var toBroadcast []*eth.SignedBLSToExecutionChange
var req []*structs.SignedBLSToExecutionChange
@@ -642,7 +602,7 @@ func (s *Server) SubmitBLSToExecutionChanges(w http.ResponseWriter, r *http.Requ
for i, change := range req {
sbls, err := change.ToConsensus()
if err != nil {
failures = append(failures, &server.IndexedError{
failures = append(failures, &server.IndexedVerificationFailure{
Index: i,
Message: "Unable to decode SignedBLSToExecutionChange: " + err.Error(),
})
@@ -650,14 +610,14 @@ func (s *Server) SubmitBLSToExecutionChanges(w http.ResponseWriter, r *http.Requ
}
_, err = blocks.ValidateBLSToExecutionChange(st, sbls)
if err != nil {
failures = append(failures, &server.IndexedError{
failures = append(failures, &server.IndexedVerificationFailure{
Index: i,
Message: "Could not validate SignedBLSToExecutionChange: " + err.Error(),
})
continue
}
if err := blocks.VerifyBLSChangeSignature(st, sbls); err != nil {
failures = append(failures, &server.IndexedError{
failures = append(failures, &server.IndexedVerificationFailure{
Index: i,
Message: "Could not validate signature: " + err.Error(),
})
@@ -676,9 +636,9 @@ func (s *Server) SubmitBLSToExecutionChanges(w http.ResponseWriter, r *http.Requ
}
go s.broadcastBLSChanges(context.Background(), toBroadcast)
if len(failures) > 0 {
failuresErr := &server.IndexedErrorContainer{
failuresErr := &server.IndexedVerificationFailureError{
Code: http.StatusBadRequest,
Message: server.ErrIndexedValidationFail,
Message: "One or more BLSToExecutionChange failed validation",
Failures: failures,
}
httputil.WriteError(w, failuresErr)
@@ -695,18 +655,18 @@ func (s *Server) broadcastBLSBatch(ctx context.Context, ptr *[]*eth.SignedBLSToE
}
st, err := s.ChainInfoFetcher.HeadStateReadOnly(ctx)
if err != nil {
log.WithError(err).Error("Could not get head state")
log.WithError(err).Error("could not get head state")
return
}
for _, ch := range (*ptr)[:limit] {
if ch != nil {
_, err := blocks.ValidateBLSToExecutionChange(st, ch)
if err != nil {
log.WithError(err).Error("Could not validate BLS to execution change")
log.WithError(err).Error("could not validate BLS to execution change")
continue
}
if err := s.Broadcaster.Broadcast(ctx, ch); err != nil {
log.WithError(err).Error("Could not broadcast BLS to execution changes.")
log.WithError(err).Error("could not broadcast BLS to execution changes.")
}
}
}

View File

@@ -638,7 +638,7 @@ func TestSubmitAttestations(t *testing.T) {
s.SubmitAttestations(writer, request)
assert.Equal(t, http.StatusBadRequest, writer.Code)
e := &server.IndexedErrorContainer{}
e := &server.IndexedVerificationFailureError{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e))
assert.Equal(t, http.StatusBadRequest, e.Code)
require.Equal(t, 1, len(e.Failures))
@@ -772,7 +772,7 @@ func TestSubmitAttestations(t *testing.T) {
s.SubmitAttestationsV2(writer, request)
assert.Equal(t, http.StatusBadRequest, writer.Code)
e := &server.IndexedErrorContainer{}
e := &server.IndexedVerificationFailureError{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e))
assert.Equal(t, http.StatusBadRequest, e.Code)
require.Equal(t, 1, len(e.Failures))
@@ -873,7 +873,7 @@ func TestSubmitAttestations(t *testing.T) {
s.SubmitAttestationsV2(writer, request)
assert.Equal(t, http.StatusBadRequest, writer.Code)
e := &server.IndexedErrorContainer{}
e := &server.IndexedVerificationFailureError{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), e))
assert.Equal(t, http.StatusBadRequest, e.Code)
require.Equal(t, 1, len(e.Failures))
@@ -1538,7 +1538,7 @@ func TestSubmitSignedBLSToExecutionChanges_Failures(t *testing.T) {
s.SubmitBLSToExecutionChanges(writer, request)
assert.Equal(t, http.StatusBadRequest, writer.Code)
time.Sleep(10 * time.Millisecond) // Delay to allow the routine to start
require.StringContains(t, "One or more messages failed validation", writer.Body.String())
require.StringContains(t, "One or more BLSToExecutionChange failed validation", writer.Body.String())
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, numValidators, len(broadcaster.BroadcastMessages)+1)

View File

@@ -12,7 +12,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//api:go_default_library",
"//api/server:go_default_library",
"//api/server/structs:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/builder:go_default_library",

View File

@@ -14,7 +14,6 @@ import (
"time"
"github.com/OffchainLabs/prysm/v6/api"
"github.com/OffchainLabs/prysm/v6/api/server"
"github.com/OffchainLabs/prysm/v6/api/server/structs"
"github.com/OffchainLabs/prysm/v6/beacon-chain/builder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
@@ -269,61 +268,22 @@ func (s *Server) SubmitContributionAndProofs(w http.ResponseWriter, r *http.Requ
return
}
var failures []*server.IndexedError
var failedBroadcasts []*server.IndexedError
for i, item := range reqData {
for _, item := range reqData {
var contribution structs.SignedContributionAndProof
if err := json.Unmarshal(item, &contribution); err != nil {
failures = append(failures, &server.IndexedError{
Index: i,
Message: "Could not unmarshal message: " + err.Error(),
})
continue
httputil.HandleError(w, "Could not decode item: "+err.Error(), http.StatusBadRequest)
return
}
consensusItem, err := contribution.ToConsensus()
if err != nil {
failures = append(failures, &server.IndexedError{
Index: i,
Message: "Could not convert request contribution to consensus contribution: " + err.Error(),
})
continue
httputil.HandleError(w, "Could not convert contribution to consensus format: "+err.Error(), http.StatusBadRequest)
return
}
rpcError := s.CoreService.SubmitSignedContributionAndProof(ctx, consensusItem)
if rpcError != nil {
var broadcastFailedErr *server.BroadcastFailedError
if errors.As(rpcError.Err, &broadcastFailedErr) {
failedBroadcasts = append(failedBroadcasts, &server.IndexedError{
Index: i,
Message: rpcError.Err.Error(),
})
continue
} else {
httputil.HandleError(w, rpcError.Err.Error(), core.ErrorReasonToHTTP(rpcError.Reason))
return
}
if rpcError := s.CoreService.SubmitSignedContributionAndProof(ctx, consensusItem); rpcError != nil {
httputil.HandleError(w, rpcError.Err.Error(), core.ErrorReasonToHTTP(rpcError.Reason))
return
}
}
if len(failures) > 0 {
failuresErr := &server.IndexedErrorContainer{
Code: http.StatusBadRequest,
Message: server.ErrIndexedValidationFail,
Failures: failures,
}
httputil.WriteError(w, failuresErr)
return
}
if len(failedBroadcasts) > 0 {
failuresErr := &server.IndexedErrorContainer{
Code: http.StatusInternalServerError,
Message: server.ErrIndexedBroadcastFail,
Failures: failedBroadcasts,
}
httputil.WriteError(w, failuresErr)
return
}
}
// Deprecated: use SubmitAggregateAndProofsV2 instead
@@ -362,8 +322,8 @@ func (s *Server) SubmitAggregateAndProofs(w http.ResponseWriter, r *http.Request
}
rpcError := s.CoreService.SubmitSignedAggregateSelectionProof(ctx, consensusItem)
if rpcError != nil {
var broadcastFailedErr *server.BroadcastFailedError
ok := errors.As(rpcError.Err, &broadcastFailedErr)
var aggregateBroadcastFailedError *core.AggregateBroadcastFailedError
ok := errors.As(rpcError.Err, &aggregateBroadcastFailedError)
if ok {
broadcastFailed = true
} else {
@@ -408,83 +368,49 @@ func (s *Server) SubmitAggregateAndProofsV2(w http.ResponseWriter, r *http.Reque
return
}
var failures []*server.IndexedError
var failedBroadcasts []*server.IndexedError
broadcastFailed := false
var rpcError *core.RpcError
for i, raw := range reqData {
for _, raw := range reqData {
if v >= version.Electra {
var signedAggregate structs.SignedAggregateAttestationAndProofElectra
err = json.Unmarshal(raw, &signedAggregate)
if err != nil {
failures = append(failures, &server.IndexedError{
Index: i,
Message: "Could not parse message: " + err.Error(),
})
continue
httputil.HandleError(w, "Failed to parse aggregate attestation and proof: "+err.Error(), http.StatusBadRequest)
return
}
consensusItem, err := signedAggregate.ToConsensus()
if err != nil {
failures = append(failures, &server.IndexedError{
Index: i,
Message: "Could not convert request aggregate to consensus aggregate: " + err.Error(),
})
continue
httputil.HandleError(w, "Could not convert request aggregate to consensus aggregate: "+err.Error(), http.StatusBadRequest)
return
}
rpcError = s.CoreService.SubmitSignedAggregateSelectionProof(ctx, consensusItem)
} else {
var signedAggregate structs.SignedAggregateAttestationAndProof
err = json.Unmarshal(raw, &signedAggregate)
if err != nil {
failures = append(failures, &server.IndexedError{
Index: i,
Message: "Could not parse message: " + err.Error(),
})
continue
httputil.HandleError(w, "Failed to parse aggregate attestation and proof: "+err.Error(), http.StatusBadRequest)
return
}
consensusItem, err := signedAggregate.ToConsensus()
if err != nil {
failures = append(failures, &server.IndexedError{
Index: i,
Message: "Could not convert request aggregate to consensus aggregate: " + err.Error(),
})
continue
httputil.HandleError(w, "Could not convert request aggregate to consensus aggregate: "+err.Error(), http.StatusBadRequest)
return
}
rpcError = s.CoreService.SubmitSignedAggregateSelectionProof(ctx, consensusItem)
}
if rpcError != nil {
var broadcastFailedErr *server.BroadcastFailedError
if errors.As(rpcError.Err, &broadcastFailedErr) {
failedBroadcasts = append(failedBroadcasts, &server.IndexedError{
Index: i,
Message: rpcError.Err.Error(),
})
continue
var aggregateBroadcastFailedError *core.AggregateBroadcastFailedError
if errors.As(rpcError.Err, &aggregateBroadcastFailedError) {
broadcastFailed = true
} else {
httputil.HandleError(w, rpcError.Err.Error(), core.ErrorReasonToHTTP(rpcError.Reason))
return
}
}
}
if len(failures) > 0 {
failuresErr := &server.IndexedErrorContainer{
Code: http.StatusBadRequest,
Message: server.ErrIndexedValidationFail,
Failures: failures,
}
httputil.WriteError(w, failuresErr)
return
}
if len(failedBroadcasts) > 0 {
failuresErr := &server.IndexedErrorContainer{
Code: http.StatusInternalServerError,
Message: server.ErrIndexedBroadcastFail,
Failures: failedBroadcasts,
}
httputil.WriteError(w, failuresErr)
return
if broadcastFailed {
httputil.HandleError(w, "Could not broadcast one or more signed aggregated attestations", http.StatusInternalServerError)
}
}

View File

@@ -3,7 +3,6 @@ package lookup
import (
"context"
"fmt"
"math"
"strconv"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain"
@@ -284,13 +283,9 @@ func (p *BeaconDbBlocker) Blobs(ctx context.Context, id string, opts ...options.
return make([]*blocks.VerifiedROBlob, 0), nil
}
// Compute the first Fulu slot.
fuluForkSlot := primitives.Slot(math.MaxUint64)
if fuluForkEpoch := params.BeaconConfig().FuluForkEpoch; fuluForkEpoch != primitives.Epoch(math.MaxUint64) {
fuluForkSlot, err = slots.EpochStart(fuluForkEpoch)
if err != nil {
return nil, &core.RpcError{Err: errors.Wrap(err, "could not calculate Fulu start slot"), Reason: core.Internal}
}
fuluForkSlot, err := slots.EpochStart(params.BeaconConfig().FuluForkEpoch)
if err != nil {
return nil, &core.RpcError{Err: errors.Wrap(err, "could not calculate Fulu start slot"), Reason: core.Internal}
}
// Convert versioned hashes to indices if provided

View File

@@ -587,51 +587,6 @@ func TestGetBlob(t *testing.T) {
require.Equal(t, http.StatusBadRequest, core.ErrorReasonToHTTP(rpcErr.Reason))
require.StringContains(t, "not supported before", rpcErr.Err.Error())
})
t.Run("fulu fork epoch not set (MaxUint64)", func(t *testing.T) {
// Setup with Deneb fork enabled but Fulu fork epoch set to MaxUint64 (not set/far future)
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig().Copy()
cfg.DenebForkEpoch = 1
cfg.FuluForkEpoch = primitives.Epoch(math.MaxUint64) // Not set / far future
params.OverrideBeaconConfig(cfg)
// Create and save Deneb block and blob sidecars
denebSlot := util.SlotAtEpoch(t, cfg.DenebForkEpoch)
_, tempBlobStorage := filesystem.NewEphemeralBlobStorageAndFs(t)
denebBlockWithBlobs, denebBlobSidecars := util.GenerateTestDenebBlockWithSidecar(t, [fieldparams.RootLength]byte{}, denebSlot, 2, util.WithDenebSlot(denebSlot))
denebBlockRoot := denebBlockWithBlobs.Root()
verifiedDenebBlobs := verification.FakeVerifySliceForTest(t, denebBlobSidecars)
for i := range verifiedDenebBlobs {
err := tempBlobStorage.Save(verifiedDenebBlobs[i])
require.NoError(t, err)
}
err := db.SaveBlock(t.Context(), denebBlockWithBlobs)
require.NoError(t, err)
blocker := &BeaconDbBlocker{
GenesisTimeFetcher: &testutil.MockGenesisTimeFetcher{
Genesis: time.Now(),
},
BeaconDB: db,
BlobStorage: tempBlobStorage,
}
// Should successfully retrieve blobs even when FuluForkEpoch is not set
retrievedBlobs, rpcErr := blocker.Blobs(ctx, hexutil.Encode(denebBlockRoot[:]))
require.IsNil(t, rpcErr)
require.Equal(t, 2, len(retrievedBlobs))
// Verify blob content matches
for i, retrievedBlob := range retrievedBlobs {
require.NotNil(t, retrievedBlob.BlobSidecar)
require.DeepEqual(t, denebBlobSidecars[i].Blob, retrievedBlob.Blob)
require.DeepEqual(t, denebBlobSidecars[i].KzgCommitment, retrievedBlob.KzgCommitment)
}
})
}
func TestBlobs_CommitmentOrdering(t *testing.T) {

View File

@@ -316,10 +316,6 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
blobSidecars, dataColumnSidecars, err = vs.handleUnblindedBlock(rob, req)
}
if err != nil {
if errors.Is(err, builderapi.ErrBadGateway) && block.IsBlinded() {
log.WithError(err).Info("Optimistically proposed block - builder relay temporarily unavailable, block may arrive over P2P")
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
}
return nil, status.Errorf(codes.Internal, "%s: %v", "handle block failed", err)
}

View File

@@ -6,7 +6,6 @@ import (
"testing"
"time"
builderapi "github.com/OffchainLabs/prysm/v6/api/client/builder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/builder"
@@ -3635,52 +3634,4 @@ func TestServer_ProposeBeaconBlock_PostFuluBlindedBlock(t *testing.T) {
require.NotNil(t, res)
require.NotEmpty(t, res.BlockRoot)
})
t.Run("blinded block - 502 error handling", func(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig().Copy()
cfg.FuluForkEpoch = 10
params.OverrideBeaconConfig(cfg)
mockBuilder := &builderTest.MockBuilderService{
HasConfigured: true,
Cfg: &builderTest.Config{BeaconDB: db},
PayloadDeneb: &enginev1.ExecutionPayloadDeneb{},
ErrSubmitBlindedBlock: builderapi.ErrBadGateway,
}
c := &mock.ChainService{State: beaconState, Root: parentRoot[:]}
proposerServer := &Server{
ChainStartFetcher: &mockExecution.Chain{},
Eth1InfoFetcher: &mockExecution.Chain{},
Eth1BlockFetcher: &mockExecution.Chain{},
BlockReceiver: c,
BlobReceiver: c,
HeadFetcher: c,
BlockNotifier: c.BlockNotifier(),
OperationNotifier: c.OperationNotifier(),
StateGen: stategen.New(db, doublylinkedtree.New()),
TimeFetcher: c,
SyncChecker: &mockSync.Sync{IsSyncing: false},
BeaconDB: db,
BlockBuilder: mockBuilder,
P2P: &mockp2p.MockBroadcaster{},
}
blindedBlock := util.NewBlindedBeaconBlockDeneb()
blindedBlock.Message.Slot = 160 // This puts us at epoch 5 (160/32 = 5)
blindedBlock.Message.ProposerIndex = 0
blindedBlock.Message.ParentRoot = parentRoot[:]
blindedBlock.Message.StateRoot = make([]byte, 32)
req := &ethpb.GenericSignedBeaconBlock{
Block: &ethpb.GenericSignedBeaconBlock_BlindedDeneb{BlindedDeneb: blindedBlock},
}
// Should handle 502 error gracefully and continue with original blinded block
res, err := proposerServer.ProposeBeaconBlock(ctx, req)
require.NoError(t, err)
require.NotNil(t, res)
require.NotEmpty(t, res.BlockRoot)
})
}

View File

@@ -135,7 +135,6 @@ go_library(
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_hashicorp_golang_lru//:go_default_library",
"@com_github_libp2p_go_libp2p//core:go_default_library",
"@com_github_libp2p_go_libp2p//core/host:go_default_library",

View File

@@ -192,21 +192,6 @@ var (
},
)
// Metrics for blob retrieval from execution layer based on transaction filtering
blobRetrievalAttemptsTotal = promauto.NewCounter(
prometheus.CounterOpts{
Name: "blob_retrieval_attempts_total",
Help: "Count the number of times blob retrieval from EL was attempted for filtered transactions.",
},
)
blobRetrievalSuccessTotal = promauto.NewCounter(
prometheus.CounterOpts{
Name: "blob_retrieval_success_total",
Help: "Count the number of times blob retrieval from EL was successful for filtered transactions.",
},
)
// Data column sidecar validation, beacon metrics specs
dataColumnSidecarVerificationRequestsCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "beacon_data_column_sidecar_processing_requests_total",

View File

@@ -238,11 +238,3 @@ func WithReconstructionRandGen(rg *rand.Rand) Option {
return nil
}
}
// WithBlobToAddress sets the address to filter blob transactions for.
func WithBlobToAddress(address string) Option {
return func(s *Service) error {
s.cfg.blobToAddress = address
return nil
}
}

View File

@@ -107,7 +107,6 @@ type config struct {
blobStorage *filesystem.BlobStorage
dataColumnStorage *filesystem.DataColumnStorage
batchVerifierLimit int
blobToAddress string
}
// This defines the interface for interacting with block chain service

View File

@@ -19,7 +19,6 @@ import (
"github.com/OffchainLabs/prysm/v6/io/file"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/ethereum/go-ethereum/core/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
@@ -79,41 +78,6 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) {
if roBlock.Version() >= version.Fulu {
payload, err := roBlock.Block().Body().Execution()
if err != nil {
log.WithError(err).Error("Failed to extract execution payload")
return
}
txs, err := payload.Transactions()
if err != nil {
log.WithError(err).Error("Failed to extract transactions from execution payload")
return
}
for i, txBytes := range txs {
var tx types.Transaction
if err := tx.UnmarshalBinary(txBytes); err != nil {
log.WithError(err).WithField("txIndex", i).Error("Failed to unmarshal transaction")
continue
}
if tx.Type() == 3 && s.cfg.blobToAddress != "" && tx.To() != nil && tx.To().Hex() == s.cfg.blobToAddress {
blobHashes := tx.BlobHashes()
blobRetrievalAttemptsTotal.Inc()
_, err := s.cfg.executionReconstructor.GetBlobsV2(ctx, blobHashes)
if err != nil {
log.WithError(err).WithField("blobHashes", blobHashes).Error("Failed to reconstruct blob")
continue
}
blobRetrievalSuccessTotal.Inc()
log.WithFields(logrus.Fields{
"txIndex": i,
"txHash": tx.Hash().Hex(),
"slot": roBlock.Block().Slot(),
"blobHashes": blobHashes,
}).Info("Retrieved blob")
}
}
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromBlock(roBlock)); err != nil {
log.WithError(err).Error("Failed to process data column sidecars from execution")
return

View File

@@ -294,9 +294,6 @@ func (s *Service) validatePhase0Block(ctx context.Context, blk interfaces.ReadOn
}
if err := blocks.VerifyBlockSignatureUsingCurrentFork(parentState, blk, blockRoot); err != nil {
if errors.Is(err, blocks.ErrInvalidSignature) {
s.setBadBlock(ctx, blockRoot)
}
return nil, err
}
// In the event the block is more than an epoch ahead from its

View File

@@ -103,84 +103,11 @@ func TestValidateBeaconBlockPubSub_InvalidSignature(t *testing.T) {
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
require.ErrorContains(t, "invalid signature", err)
require.ErrorIs(t, err, signing.ErrSigFailedToVerify)
result := res == pubsub.ValidationReject
assert.Equal(t, true, result)
}
func TestValidateBeaconBlockPubSub_InvalidSignature_MarksBlockAsBad(t *testing.T) {
db := dbtest.SetupDB(t)
p := p2ptest.NewTestP2P(t)
ctx := t.Context()
beaconState, privKeys := util.DeterministicGenesisState(t, 100)
parentBlock := util.NewBeaconBlock()
util.SaveBlock(t, ctx, db, parentBlock)
bRoot, err := parentBlock.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, db.SaveState(ctx, beaconState, bRoot))
require.NoError(t, db.SaveStateSummary(ctx, &ethpb.StateSummary{Root: bRoot[:]}))
copied := beaconState.Copy()
require.NoError(t, copied.SetSlot(1))
proposerIdx, err := helpers.BeaconProposerIndex(ctx, copied)
require.NoError(t, err)
msg := util.NewBeaconBlock()
msg.Block.ParentRoot = bRoot[:]
msg.Block.Slot = 1
msg.Block.ProposerIndex = proposerIdx
badPrivKeyIdx := proposerIdx + 1 // We generate a valid signature from a wrong private key which fails to verify
msg.Signature, err = signing.ComputeDomainAndSign(beaconState, 0, msg.Block, params.BeaconConfig().DomainBeaconProposer, privKeys[badPrivKeyIdx])
require.NoError(t, err)
stateGen := stategen.New(db, doublylinkedtree.New())
chainService := &mock.ChainService{Genesis: time.Unix(time.Now().Unix()-int64(params.BeaconConfig().SecondsPerSlot), 0),
FinalizedCheckPoint: &ethpb.Checkpoint{
Epoch: 0,
Root: make([]byte, 32),
},
DB: db,
}
r := &Service{
cfg: &config{
beaconDB: db,
p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen,
},
seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10),
}
blockRoot, err := msg.Block.HashTreeRoot()
require.NoError(t, err)
// Verify block is not marked as bad initially
assert.Equal(t, false, r.hasBadBlock(blockRoot), "block should not be marked as bad initially")
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err)
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
require.ErrorContains(t, "invalid signature", err)
result := res == pubsub.ValidationReject
assert.Equal(t, true, result)
// Verify block is now marked as bad after invalid signature
assert.Equal(t, true, r.hasBadBlock(blockRoot), "block should be marked as bad after invalid signature")
}
func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) {
db := dbtest.SetupDB(t)
ctx := t.Context()
@@ -1049,7 +976,7 @@ func TestValidateBeaconBlockPubSub_InvalidParentBlock(t *testing.T) {
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
require.ErrorContains(t, "invalid signature", err)
require.ErrorContains(t, "could not unmarshal bytes into signature", err)
assert.Equal(t, res, pubsub.ValidationReject, "block with invalid signature should be rejected")
require.NoError(t, copied.SetSlot(2))

View File

@@ -1,3 +0,0 @@
### Added
- Delegate sszInfo HashTreeRoot to FastSSZ-generated implementations via SSZObject, enabling roots calculation for generated types while avoiding duplicate logic.

View File

@@ -1,2 +0,0 @@
### Fixed
- Fix `/eth/v1/beacon/blob_sidecars/` beacon API is the fulu fork epoch is set to the far future epoch.

View File

@@ -1,2 +0,0 @@
### Fixed
- `HasAtLeastOneIndex`: Check the index is not too high.

View File

@@ -0,0 +1,2 @@
### Added
- Rename `--subscribe-all-subnets` into `--subscribe-all-attestation-and-sync-subnets` and set `--subscribe-all-subnets` as an alias.

View File

@@ -1,2 +0,0 @@
### Ignored
- Fix (unreleased) bug where the preallocated slice for KZG Proofs was 48x bigger than it needed to be.

View File

@@ -1,3 +0,0 @@
### Fixed
- Mark the block as invalid if it has an invalid signature.

View File

@@ -1,3 +0,0 @@
### Ignored
- Remove redundant check for genesis root at startup.

View File

@@ -1,3 +0,0 @@
### Changed
- Improve returning individual message errors from Beacon API.

View File

@@ -1,3 +0,0 @@
### Fixed
- Display error messages from the server verbatim when they are not encoded as `application/json`.

View File

@@ -1,7 +0,0 @@
### Added
- SSZ-QL: Use `fastssz`'s `SizeSSZ` method for calculating the size of `Container` type.
### Changed
- SSZ-QL: Clarify `Size` method with more sophisticated `SSZType`s.

View File

@@ -1,3 +0,0 @@
### Changed
- Gracefully handle submit blind block returning 502 errors.

View File

@@ -229,10 +229,11 @@ var (
Name: "disable-debug-rpc-endpoints",
Usage: "Disables the debug Beacon API namespace.",
}
// SubscribeToAllSubnets defines a flag to specify whether to subscribe to all possible attestation/sync subnets or not.
SubscribeToAllSubnets = &cli.BoolFlag{
Name: "subscribe-all-subnets",
Usage: "Subscribe to all possible attestation and sync subnets.",
// SubscribeToAllAttAndSyncSubnets defines a flag to specify whether to subscribe to all possible attestation/sync subnets or not.
SubscribeToAllAttAndSyncSubnets = &cli.BoolFlag{
Name: "subscribe-all-attestation-and-sync-subnets",
Aliases: []string{"subscribe-all-subnets"},
Usage: "Subscribe to all possible attestation and sync subnets.",
}
// HistoricalSlasherNode is a set of beacon node flags required for performing historical detection with a slasher.
HistoricalSlasherNode = &cli.BoolFlag{
@@ -344,9 +345,4 @@ var (
Usage: "Maximum number of signatures to batch verify at once for beacon attestation p2p gossip.",
Value: 1000,
}
BlobToAddressFlag = &cli.StringFlag{
Name: "blob-to-address",
Usage: "Ethereum address to filter type 3 (blob) transactions for processing. When set, only blob transactions sent to this address will be used to try to retrieve blobs from the execution layer and save them.",
Value: "",
}
)

View File

@@ -41,7 +41,7 @@ func Init(c *GlobalFlags) {
func ConfigureGlobalFlags(ctx *cli.Context) {
cfg := &GlobalFlags{}
if ctx.Bool(SubscribeToAllSubnets.Name) {
if ctx.Bool(SubscribeToAllAttAndSyncSubnets.Name) {
log.Warning("Subscribing to all attestation subnets")
cfg.SubscribeToAllSubnets = true
}

View File

@@ -64,7 +64,7 @@ var appFlags = []cli.Flag{
flags.InteropMockEth1DataVotesFlag,
flags.SlotsPerArchivedPoint,
flags.DisableDebugRPCEndpoints,
flags.SubscribeToAllSubnets,
flags.SubscribeToAllAttAndSyncSubnets,
flags.SubscribeAllDataSubnets,
flags.HistoricalSlasherNode,
flags.ChainID,
@@ -87,7 +87,6 @@ var appFlags = []cli.Flag{
flags.BeaconDBPruning,
flags.PrunerRetentionEpochs,
flags.EnableBuilderSSZ,
flags.BlobToAddressFlag,
cmd.MinimalConfigFlag,
cmd.E2EConfigFlag,
cmd.RPCMaxPageSizeFlag,

View File

@@ -106,7 +106,7 @@ var appHelpFlagGroups = []flagGroup{
flags.MaxConcurrentDials,
flags.MinPeersPerSubnet,
flags.MinSyncPeers,
flags.SubscribeToAllSubnets,
flags.SubscribeToAllAttAndSyncSubnets,
flags.SubscribeAllDataSubnets,
},
},

View File

@@ -219,8 +219,8 @@ func TestCallWithdrawalEndpoint_Errors(t *testing.T) {
if r.Method == http.MethodPost && r.RequestURI == "/eth/v1/beacon/pool/bls_to_execution_changes" {
w.WriteHeader(400)
w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(&server.IndexedErrorContainer{
Failures: []*server.IndexedError{
err = json.NewEncoder(w).Encode(&server.IndexedVerificationFailureError{
Failures: []*server.IndexedVerificationFailure{
{Index: 0, Message: "Could not validate SignedBLSToExecutionChange"},
},
})

View File

@@ -11,7 +11,6 @@ go_library(
"path.go",
"query.go",
"ssz_info.go",
"ssz_object.go",
"ssz_type.go",
"tag_parser.go",
"vector.go",
@@ -24,6 +23,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"analyzer_test.go",
"path_test.go",
"query_test.go",
"tag_parser_test.go",

View File

@@ -10,18 +10,18 @@ import (
const offsetBytes = 4
// AnalyzeObject analyzes given object and returns its SSZ information.
func AnalyzeObject(obj SSZObject) (*sszInfo, error) {
value := reflect.ValueOf(obj)
func AnalyzeObject(obj any) (*sszInfo, error) {
value := dereferencePointer(obj)
info, err := analyzeType(value, nil)
info, err := analyzeType(value.Type(), nil)
if err != nil {
return nil, fmt.Errorf("could not analyze type %s: %w", value.Type().Name(), err)
}
// Populate variable-length information using the actual value.
err = PopulateVariableLengthInfo(info, value)
err = PopulateVariableLengthInfo(info, value.Interface())
if err != nil {
return nil, fmt.Errorf("could not populate variable length info for type %s: %w", value.Type().Name(), err)
return nil, fmt.Errorf("could not populate variable length info: %w", err)
}
return info, nil
@@ -30,13 +30,13 @@ func AnalyzeObject(obj SSZObject) (*sszInfo, error) {
// PopulateVariableLengthInfo populates runtime information for SSZ fields of variable-sized types.
// This function updates the sszInfo structure with actual lengths and offsets that can only
// be determined at runtime for variable-sized items like Lists and variable-sized Container fields.
func PopulateVariableLengthInfo(sszInfo *sszInfo, value reflect.Value) error {
func PopulateVariableLengthInfo(sszInfo *sszInfo, value any) error {
if sszInfo == nil {
return errors.New("sszInfo is nil")
}
if !value.IsValid() {
return errors.New("value is invalid")
if value == nil {
return errors.New("value is nil")
}
// Short circuit: If the type is fixed-sized, we don't need to fill in the info.
@@ -56,18 +56,18 @@ func PopulateVariableLengthInfo(sszInfo *sszInfo, value reflect.Value) error {
return errors.New("listInfo is nil")
}
if value.Kind() != reflect.Slice {
return fmt.Errorf("expected slice for List type, got %v", value.Kind())
val := reflect.ValueOf(value)
if val.Kind() != reflect.Slice {
return fmt.Errorf("expected slice for List type, got %v", val.Kind())
}
length := value.Len()
length := val.Len()
if listInfo.element.isVariable {
listInfo.elementSizes = make([]uint64, 0, length)
// Populate nested variable-sized type element lengths recursively.
for i := range length {
if err := PopulateVariableLengthInfo(listInfo.element, value.Index(i)); err != nil {
if err := PopulateVariableLengthInfo(listInfo.element, val.Index(i).Interface()); err != nil {
return fmt.Errorf("could not populate nested list element at index %d: %w", i, err)
}
listInfo.elementSizes = append(listInfo.elementSizes, listInfo.element.Size())
@@ -91,7 +91,8 @@ func PopulateVariableLengthInfo(sszInfo *sszInfo, value reflect.Value) error {
return errors.New("bitlistInfo is nil")
}
if err := bitlistInfo.SetLengthFromBytes(value.Bytes()); err != nil {
val := reflect.ValueOf(value)
if err := bitlistInfo.SetLengthFromBytes(val.Bytes()); err != nil {
return fmt.Errorf("could not set bitlist length from bytes: %w", err)
}
@@ -104,21 +105,11 @@ func PopulateVariableLengthInfo(sszInfo *sszInfo, value reflect.Value) error {
return fmt.Errorf("could not get container info: %w", err)
}
if containerInfo == nil {
return errors.New("containerInfo is nil")
}
// Dereference first in case value is a pointer.
derefValue := dereferencePointer(value)
if derefValue.Kind() != reflect.Struct {
return fmt.Errorf("expected struct for Container type, got %v", derefValue.Kind())
}
// Reset the pointer to the new value.
sszInfo.source = castToSSZObject(derefValue)
// Start with the end offset of this Container.
currentOffset := containerInfo.fixedOffset
// Start with the fixed size of this Container.
currentOffset := sszInfo.FixedSize()
for _, fieldName := range containerInfo.order {
fieldInfo := containerInfo.fields[fieldName]
@@ -134,15 +125,13 @@ func PopulateVariableLengthInfo(sszInfo *sszInfo, value reflect.Value) error {
// Recursively populate variable-sized fields.
fieldValue := derefValue.FieldByName(fieldInfo.goFieldName)
if err := PopulateVariableLengthInfo(childSszInfo, fieldValue); err != nil {
if err := PopulateVariableLengthInfo(childSszInfo, fieldValue.Interface()); err != nil {
return fmt.Errorf("could not populate from value for field %s: %w", fieldName, err)
}
// Each variable-sized element needs an offset entry.
if listInfo, err := childSszInfo.ListInfo(); err == nil && listInfo != nil {
if listInfo.element.isVariable {
currentOffset += listInfo.Length() * offsetBytes
}
if childSszInfo.sszType == List {
currentOffset += childSszInfo.listInfo.OffsetBytes()
}
// Set the actual offset for variable-sized fields.
@@ -157,64 +146,66 @@ func PopulateVariableLengthInfo(sszInfo *sszInfo, value reflect.Value) error {
}
}
// analyzeType is an entry point that inspects a reflect.Value and computes its SSZ layout information.
func analyzeType(value reflect.Value, tag *reflect.StructTag) (*sszInfo, error) {
switch value.Kind() {
// analyzeType is an entry point that inspects a reflect.Type and computes its SSZ layout information.
func analyzeType(typ reflect.Type, tag *reflect.StructTag) (*sszInfo, error) {
switch typ.Kind() {
// Basic types (e.g., uintN where N is 8, 16, 32, 64)
// NOTE: uint128 and uint256 are represented as []byte in Go,
// so we handle them as slices. See `analyzeHomogeneousColType`.
case reflect.Uint64, reflect.Uint32, reflect.Uint16, reflect.Uint8, reflect.Bool:
return analyzeBasicType(value)
return analyzeBasicType(typ)
case reflect.Slice:
return analyzeHomogeneousColType(value, tag)
return analyzeHomogeneousColType(typ, tag)
case reflect.Struct:
return analyzeContainerType(value)
return analyzeContainerType(typ)
case reflect.Pointer:
derefValue := dereferencePointer(value)
return analyzeType(derefValue, tag)
case reflect.Ptr:
// Dereference pointer types.
return analyzeType(typ.Elem(), tag)
default:
return nil, fmt.Errorf("unsupported type %v for SSZ calculation", value.Kind())
return nil, fmt.Errorf("unsupported type %v for SSZ calculation", typ.Kind())
}
}
// analyzeBasicType analyzes SSZ basic types (uintN, bool) and returns its info.
func analyzeBasicType(value reflect.Value) (*sszInfo, error) {
var sszType SSZType
switch value.Kind() {
case reflect.Uint64:
sszType = Uint64
case reflect.Uint32:
sszType = Uint32
case reflect.Uint16:
sszType = Uint16
case reflect.Uint8:
sszType = Uint8
case reflect.Bool:
sszType = Boolean
default:
return nil, fmt.Errorf("unsupported basic type %v for SSZ calculation", value.Kind())
}
func analyzeBasicType(typ reflect.Type) (*sszInfo, error) {
sszInfo := &sszInfo{
sszType: sszType,
typ: value.Type(),
typ: typ,
// Every basic type is fixed-size and not variable.
isVariable: false,
}
switch typ.Kind() {
case reflect.Uint64:
sszInfo.sszType = UintN
sszInfo.fixedSize = 8
case reflect.Uint32:
sszInfo.sszType = UintN
sszInfo.fixedSize = 4
case reflect.Uint16:
sszInfo.sszType = UintN
sszInfo.fixedSize = 2
case reflect.Uint8:
sszInfo.sszType = UintN
sszInfo.fixedSize = 1
case reflect.Bool:
sszInfo.sszType = Boolean
sszInfo.fixedSize = 1
default:
return nil, fmt.Errorf("unsupported basic type %v for SSZ calculation", typ.Kind())
}
return sszInfo, nil
}
// analyzeHomogeneousColType analyzes homogeneous collection types (e.g., List, Vector, Bitlist, Bitvector) and returns its SSZ info.
func analyzeHomogeneousColType(value reflect.Value, tag *reflect.StructTag) (*sszInfo, error) {
if value.Kind() != reflect.Slice {
return nil, fmt.Errorf("can only analyze slice types, got %v", value.Kind())
func analyzeHomogeneousColType(typ reflect.Type, tag *reflect.StructTag) (*sszInfo, error) {
if typ.Kind() != reflect.Slice {
return nil, fmt.Errorf("can only analyze slice types, got %v", typ.Kind())
}
// Parse the first dimension from the tag and get remaining tag for element
@@ -226,12 +217,8 @@ func analyzeHomogeneousColType(value reflect.Value, tag *reflect.StructTag) (*ss
return nil, errors.New("ssz tag is required for slice types")
}
// NOTE: Elem() won't panic because value is guaranteed to be a slice here.
elementType := value.Type().Elem()
// Analyze element type with remaining dimensions
// Note that it is enough to analyze by a zero value,
// as the actual value with variable-sized type will be populated later.
elementInfo, err := analyzeType(reflect.New(elementType), remainingTag)
elementInfo, err := analyzeType(typ.Elem(), remainingTag)
if err != nil {
return nil, fmt.Errorf("could not analyze element type for homogeneous collection: %w", err)
}
@@ -243,7 +230,7 @@ func analyzeHomogeneousColType(value reflect.Value, tag *reflect.StructTag) (*ss
return nil, fmt.Errorf("could not get list limit: %w", err)
}
return analyzeListType(value, elementInfo, limit, sszDimension.isBitfield)
return analyzeListType(typ, elementInfo, limit, sszDimension.isBitfield)
}
// 2. Handle Vector/Bitvector type
@@ -253,7 +240,7 @@ func analyzeHomogeneousColType(value reflect.Value, tag *reflect.StructTag) (*ss
return nil, fmt.Errorf("could not get vector length: %w", err)
}
return analyzeVectorType(value, elementInfo, length, sszDimension.isBitfield)
return analyzeVectorType(typ, elementInfo, length, sszDimension.isBitfield)
}
// Parsing ssz tag doesn't provide enough information to determine the collection type,
@@ -262,12 +249,13 @@ func analyzeHomogeneousColType(value reflect.Value, tag *reflect.StructTag) (*ss
}
// analyzeListType analyzes SSZ List/Bitlist type and returns its SSZ info.
func analyzeListType(value reflect.Value, elementInfo *sszInfo, limit uint64, isBitfield bool) (*sszInfo, error) {
func analyzeListType(typ reflect.Type, elementInfo *sszInfo, limit uint64, isBitfield bool) (*sszInfo, error) {
if isBitfield {
return &sszInfo{
sszType: Bitlist,
typ: value.Type(),
typ: typ,
fixedSize: offsetBytes,
isVariable: true,
bitlistInfo: &bitlistInfo{
@@ -282,8 +270,9 @@ func analyzeListType(value reflect.Value, elementInfo *sszInfo, limit uint64, is
return &sszInfo{
sszType: List,
typ: value.Type(),
typ: typ,
fixedSize: offsetBytes,
isVariable: true,
listInfo: &listInfo{
@@ -294,12 +283,14 @@ func analyzeListType(value reflect.Value, elementInfo *sszInfo, limit uint64, is
}
// analyzeVectorType analyzes SSZ Vector/Bitvector type and returns its SSZ info.
func analyzeVectorType(value reflect.Value, elementInfo *sszInfo, length uint64, isBitfield bool) (*sszInfo, error) {
func analyzeVectorType(typ reflect.Type, elementInfo *sszInfo, length uint64, isBitfield bool) (*sszInfo, error) {
if isBitfield {
return &sszInfo{
sszType: Bitvector,
typ: value.Type(),
typ: typ,
// Size in bytes
fixedSize: length,
isVariable: false,
bitvectorInfo: &bitvectorInfo{
@@ -320,8 +311,9 @@ func analyzeVectorType(value reflect.Value, elementInfo *sszInfo, length uint64,
return &sszInfo{
sszType: Vector,
typ: value.Type(),
typ: typ,
fixedSize: length * elementInfo.Size(),
isVariable: false,
vectorInfo: &vectorInfo{
@@ -332,36 +324,44 @@ func analyzeVectorType(value reflect.Value, elementInfo *sszInfo, length uint64,
}
// analyzeContainerType analyzes SSZ Container type and returns its SSZ info.
func analyzeContainerType(value reflect.Value) (*sszInfo, error) {
if value.Kind() != reflect.Struct {
return nil, fmt.Errorf("can only analyze struct types, got %v", value.Kind())
func analyzeContainerType(typ reflect.Type) (*sszInfo, error) {
if typ.Kind() != reflect.Struct {
return nil, fmt.Errorf("can only analyze struct types, got %v", typ.Kind())
}
containerTyp := value.Type()
fields := make(map[string]*fieldInfo)
order := make([]string, 0)
order := make([]string, 0, typ.NumField())
isVariable := false
sszInfo := &sszInfo{
sszType: Container,
typ: typ,
}
var currentOffset uint64
for i := 0; i < value.NumField(); i++ {
structFieldInfo := containerTyp.Field(i)
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
// Protobuf-generated structs contain private fields we must skip.
// e.g., state, sizeCache, unknownFields, etc.
if !structFieldInfo.IsExported() {
if !field.IsExported() {
continue
}
tag := structFieldInfo.Tag
goFieldName := structFieldInfo.Name
fieldName, err := parseFieldNameFromTag(tag)
if err != nil {
return nil, fmt.Errorf("could not parse field name from tag for field %s: %w", goFieldName, err)
// The JSON tag contains the field name in the first part.
// e.g., "attesting_indices,omitempty" -> "attesting_indices".
jsonTag := field.Tag.Get("json")
if jsonTag == "" {
return nil, fmt.Errorf("field %s has no JSON tag", field.Name)
}
// NOTE: `fieldName` is a string with `snake_case` format (following consensus specs).
fieldName := strings.Split(jsonTag, ",")[0]
if fieldName == "" {
return nil, fmt.Errorf("field %s has an empty JSON tag", field.Name)
}
// Analyze each field so that we can complete full SSZ information.
info, err := analyzeType(value.Field(i), &tag)
info, err := analyzeType(field.Type, &field.Tag)
if err != nil {
return nil, fmt.Errorf("could not analyze type for field %s: %w", fieldName, err)
}
@@ -370,7 +370,7 @@ func analyzeContainerType(value reflect.Value) (*sszInfo, error) {
fields[fieldName] = &fieldInfo{
sszInfo: info,
offset: currentOffset,
goFieldName: goFieldName,
goFieldName: field.Name,
}
// Persist order
order = append(order, fieldName)
@@ -379,87 +379,34 @@ func analyzeContainerType(value reflect.Value) (*sszInfo, error) {
if info.isVariable {
// If one of the fields is variable-sized,
// the entire struct is considered variable-sized.
isVariable = true
sszInfo.isVariable = true
currentOffset += offsetBytes
} else {
currentOffset += info.Size()
currentOffset += info.fixedSize
}
}
return &sszInfo{
sszType: Container,
typ: containerTyp,
source: castToSSZObject(value),
sszInfo.fixedSize = currentOffset
sszInfo.containerInfo = &containerInfo{
fields: fields,
order: order,
}
isVariable: isVariable,
containerInfo: &containerInfo{
fields: fields,
order: order,
fixedOffset: currentOffset,
},
}, nil
return sszInfo, nil
}
// dereferencePointer dereferences a pointer to get the underlying value using reflection.
func dereferencePointer(value reflect.Value) reflect.Value {
derefValue := value
if value.IsValid() && value.Kind() == reflect.Pointer {
func dereferencePointer(obj any) reflect.Value {
value := reflect.ValueOf(obj)
if value.Kind() == reflect.Ptr {
if value.IsNil() {
// Create a zero value if the pointer is nil.
derefValue = reflect.New(value.Type().Elem()).Elem()
// If we encounter a nil pointer before the end of the path, we can still proceed
// by analyzing the type, not the value.
value = reflect.New(value.Type().Elem()).Elem()
} else {
derefValue = value.Elem()
value = value.Elem()
}
}
return derefValue
}
// castToSSZObject attempts to cast a reflect.Value to the SSZObject interface.
// If failed, it returns nil.
func castToSSZObject(value reflect.Value) SSZObject {
if !value.IsValid() {
return nil
}
// SSZObject is only implemented by struct types.
if value.Kind() != reflect.Struct {
return nil
}
// To cast to SSZObject, we need the addressable value.
if !value.CanAddr() {
return nil
}
if sszObj, ok := value.Addr().Interface().(SSZObject); ok {
return sszObj
}
return nil
}
// parseFieldNameFromTag extracts the field name (`snake_case` format)
// from a struct tag by looking for the json tag.
// The JSON tag contains the field name in the first part.
// e.g., "attesting_indices,omitempty" -> "attesting_indices".
func parseFieldNameFromTag(tag reflect.StructTag) (string, error) {
jsonTag := tag.Get("json")
if jsonTag == "" {
return "", errors.New("no JSON tag found")
}
substrings := strings.Split(jsonTag, ",")
if len(substrings) == 0 {
return "", errors.New("invalid JSON tag format")
}
fieldName := strings.TrimSpace(substrings[0])
if fieldName == "" {
return "", errors.New("empty field name")
}
return fieldName, nil
return value
}

View File

@@ -0,0 +1,17 @@
package query_test
import (
"testing"
"github.com/OffchainLabs/prysm/v6/encoding/ssz/query"
sszquerypb "github.com/OffchainLabs/prysm/v6/proto/ssz_query"
"github.com/OffchainLabs/prysm/v6/testing/require"
)
func TestAnalyzeSSZInfo(t *testing.T) {
info, err := query.AnalyzeObject(&sszquerypb.FixedTestContainer{})
require.NoError(t, err)
require.NotNil(t, info, "Expected non-nil SSZ info")
require.Equal(t, uint64(565), info.FixedSize())
}

View File

@@ -13,12 +13,3 @@ func (v *bitvectorInfo) Length() uint64 {
return v.length
}
func (v *bitvectorInfo) Size() uint64 {
if v == nil {
return 0
}
// Size in bytes.
return v.length / 8
}

View File

@@ -3,11 +3,9 @@ package query
// containerInfo has
// 1. fields: a field map that maps a field's JSON name to its sszInfo for nested Containers
// 2. order: a list of field names in the order they should be serialized
// 3. fixedOffset: the total size of the fixed part of the container
type containerInfo struct {
fields map[string]*fieldInfo
order []string
fixedOffset uint64
fields map[string]*fieldInfo
order []string
}
type fieldInfo struct {

View File

@@ -71,3 +71,17 @@ func (l *listInfo) Size() uint64 {
}
return totalSize
}
// OffsetBytes returns the total number of offset bytes used for the list elements.
// Each variable-sized element uses 4 bytes to store its offset.
func (l *listInfo) OffsetBytes() uint64 {
if l == nil {
return 0
}
if !l.element.isVariable {
return 0
}
return offsetBytes * l.length
}

View File

@@ -11,34 +11,6 @@ import (
"github.com/prysmaticlabs/go-bitfield"
)
func TestSize(t *testing.T) {
tests := []struct {
name string
obj query.SSZObject
expectedSize uint64
}{
{
name: "FixedTestContainer",
obj: &sszquerypb.FixedTestContainer{},
expectedSize: 565,
},
{
name: "VariableTestContainer",
obj: &sszquerypb.VariableTestContainer{},
expectedSize: 128,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
info, err := query.AnalyzeObject(tt.obj)
require.NoError(t, err)
require.NotNil(t, info)
require.Equal(t, tt.expectedSize, info.Size())
})
}
}
func TestCalculateOffsetAndLength(t *testing.T) {
type testCase struct {
name string
@@ -252,56 +224,6 @@ func TestCalculateOffsetAndLength(t *testing.T) {
})
}
func TestHashTreeRoot(t *testing.T) {
tests := []struct {
name string
obj query.SSZObject
}{
{
name: "FixedNestedContainer",
obj: &sszquerypb.FixedNestedContainer{
Value1: 42,
Value2: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08},
},
},
{
name: "FixedTestContainer",
obj: createFixedTestContainer(),
},
{
name: "VariableNestedContainer",
obj: &sszquerypb.VariableNestedContainer{
Value1: 84,
FieldListUint64: []uint64{1, 2, 3, 4, 5},
NestedListField: [][]byte{
{0x0a, 0x0b, 0x0c},
{0x1a, 0x1b, 0x1c, 0x1d},
},
},
},
{
name: "VariableTestContainer",
obj: createVariableTestContainer(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Analyze the object to get its sszInfo
info, err := query.AnalyzeObject(tt.obj)
require.NoError(t, err)
require.NotNil(t, info, "Expected non-nil SSZ info")
// Call HashTreeRoot on the sszInfo and compare results
hashTreeRoot, err := info.HashTreeRoot()
require.NoError(t, err, "HashTreeRoot should not return an error")
expectedHashTreeRoot, err := tt.obj.HashTreeRoot()
require.NoError(t, err, "HashTreeRoot on original object should not return an error")
require.Equal(t, expectedHashTreeRoot, hashTreeRoot, "HashTreeRoot from sszInfo should match original object's HashTreeRoot")
})
}
}
func TestRoundTripSszInfo(t *testing.T) {
specs := []testutil.TestSpec{
getFixedTestContainerSpec(),
@@ -380,7 +302,7 @@ func getFixedTestContainerSpec() testutil.TestSpec {
return testutil.TestSpec{
Name: "FixedTestContainer",
Type: &sszquerypb.FixedTestContainer{},
Type: sszquerypb.FixedTestContainer{},
Instance: testContainer,
PathTests: []testutil.PathTest{
// Basic types
@@ -517,7 +439,7 @@ func getVariableTestContainerSpec() testutil.TestSpec {
return testutil.TestSpec{
Name: "VariableTestContainer",
Type: &sszquerypb.VariableTestContainer{},
Type: sszquerypb.VariableTestContainer{},
Instance: testContainer,
PathTests: []testutil.PathTest{
// Fixed leading field

View File

@@ -13,11 +13,11 @@ type sszInfo struct {
sszType SSZType
// Type in Go. Need this for unmarshaling.
typ reflect.Type
// Original object being analyzed
source SSZObject
// isVariable is true if the struct contains any variable-size fields.
isVariable bool
// fixedSize is the total size of the struct's fixed part.
fixedSize uint64
// For Container types.
containerInfo *containerInfo
@@ -35,38 +35,46 @@ type sszInfo struct {
bitvectorInfo *bitvectorInfo
}
func (info *sszInfo) FixedSize() uint64 {
if info == nil {
return 0
}
return info.fixedSize
}
func (info *sszInfo) Size() uint64 {
if info == nil {
return 0
}
switch info.sszType {
case Uint8:
return 1
case Uint16:
return 2
case Uint32:
return 4
case Uint64:
return 8
case Boolean:
return 1
case Container:
// Using existing API if the pointer is available.
if info.source != nil {
return uint64(info.source.SizeSSZ())
}
// Easy case: if the type is not variable, we can return the fixed size.
if !info.isVariable {
return info.fixedSize
}
return 0
case Vector:
return info.vectorInfo.Size()
switch info.sszType {
case List:
return info.listInfo.Size()
case Bitvector:
return info.bitvectorInfo.Size()
case Bitlist:
return info.bitlistInfo.Size()
case Container:
size := info.fixedSize
for _, fieldInfo := range info.containerInfo.fields {
if !fieldInfo.sszInfo.isVariable {
continue
}
// Include offset bytes inside nested lists.
if fieldInfo.sszInfo.sszType == List {
size += fieldInfo.sszInfo.listInfo.OffsetBytes()
}
size += fieldInfo.sszInfo.Size()
}
return size
default:
return 0
}
@@ -183,7 +191,7 @@ func printRecursive(info *sszInfo, builder *strings.Builder, prefix string) {
switch info.sszType {
case Container:
builder.WriteString(fmt.Sprintf("%s (%s / size: %d)\n", info, sizeDesc, info.Size()))
builder.WriteString(fmt.Sprintf("%s (%s / fixed size: %d, total size: %d)\n", info, sizeDesc, info.FixedSize(), info.Size()))
for i, key := range info.containerInfo.order {
connector := "├─"

View File

@@ -1,23 +0,0 @@
package query
import "errors"
type SSZObject interface {
HashTreeRoot() ([32]byte, error)
SizeSSZ() int
}
// HashTreeRoot calls the HashTreeRoot method on the stored interface if it implements SSZObject.
// Returns the 32-byte hash tree root or an error if the interface doesn't support hashing.
func (info *sszInfo) HashTreeRoot() ([32]byte, error) {
if info == nil {
return [32]byte{}, errors.New("sszInfo is nil")
}
if info.source == nil {
return [32]byte{}, errors.New("sszInfo.source is nil")
}
// Check if the value implements the Hashable interface
return info.source.HashTreeRoot()
}

View File

@@ -9,10 +9,8 @@ type SSZType int
// SSZ type constants.
const (
// Basic types
Uint8 SSZType = iota
Uint16
Uint32
Uint64
UintN SSZType = iota
Byte
Boolean
// Composite types
@@ -29,14 +27,10 @@ const (
func (t SSZType) String() string {
switch t {
case Uint8:
return "Uint8"
case Uint16:
return "Uint16"
case Uint32:
return "Uint32"
case Uint64:
return "Uint64"
case UintN:
return "UintN"
case Byte:
return "Byte"
case Boolean:
return "Boolean"
case Container:

View File

@@ -1,7 +1,6 @@
package testutil
import (
"reflect"
"testing"
"github.com/OffchainLabs/prysm/v6/encoding/ssz/query"
@@ -15,7 +14,7 @@ func RunStructTest(t *testing.T, spec TestSpec) {
require.NoError(t, err)
testInstance := spec.Instance
err = query.PopulateVariableLengthInfo(info, reflect.ValueOf(testInstance))
err = query.PopulateVariableLengthInfo(info, testInstance)
require.NoError(t, err)
marshaller, ok := testInstance.(ssz.Marshaler)

View File

@@ -1,7 +1,5 @@
package testutil
import "github.com/OffchainLabs/prysm/v6/encoding/ssz/query"
type PathTest struct {
Path string
Expected any
@@ -9,7 +7,7 @@ type PathTest struct {
type TestSpec struct {
Name string
Type query.SSZObject
Instance query.SSZObject
Type any
Instance any
PathTests []PathTest
}

View File

@@ -25,15 +25,3 @@ func (v *vectorInfo) Element() (*sszInfo, error) {
return v.element, nil
}
func (v *vectorInfo) Size() uint64 {
if v == nil {
return 0
}
if v.element == nil {
return 0
}
return v.length * v.element.Size()
}

View File

@@ -202,10 +202,10 @@ func TestBeaconApiValidatorClient_ProposeBeaconBlockError_ThenPass(t *testing.T)
func TestBeaconApiValidatorClient_ProposeBeaconBlockAllTypes(t *testing.T) {
tests := []struct {
name string
block *ethpb.GenericSignedBeaconBlock
expectedPath string
wantErr bool
name string
block *ethpb.GenericSignedBeaconBlock
expectedPath string
wantErr bool
errorMessage string
}{
{
@@ -374,7 +374,7 @@ func TestBeaconApiValidatorClient_ProposeBeaconBlockHTTPErrors(t *testing.T) {
gomock.Any(),
gomock.Any(),
).Return(nil, nil, tt.sszError).Times(1)
if tt.expectJSON {
// When SSZ fails, it falls back to JSON
jsonRestHandler.EXPECT().Post(

View File

@@ -121,7 +121,7 @@ func (m *MockJsonRestHandler) PostSSZ(ctx context.Context, endpoint string, head
ret0, _ := ret[0].([]byte)
ret1, _ := ret[1].(http.Header)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
return ret0,ret1,ret2
}
// Post indicates an expected call of Post.

View File

@@ -135,7 +135,7 @@ func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]b
decoder := json.NewDecoder(bytes.NewBuffer(body))
errorJson := &httputil.DefaultJsonError{}
if err = decoder.Decode(errorJson); err != nil {
return nil, nil, fmt.Errorf("HTTP request for %s unsuccessful (%d: %s)", httpResp.Request.URL, httpResp.StatusCode, string(body))
return nil, nil, errors.Wrapf(err, "failed to decode response body into error json for %s", httpResp.Request.URL)
}
return nil, nil, errorJson
}
@@ -241,7 +241,7 @@ func (c *BeaconApiRestHandler) PostSSZ(
decoder := json.NewDecoder(bytes.NewBuffer(body))
errorJson := &httputil.DefaultJsonError{}
if err = decoder.Decode(errorJson); err != nil {
return nil, nil, fmt.Errorf("HTTP request for %s unsuccessful (%d: %s)", httpResp.Request.URL, httpResp.StatusCode, string(body))
return nil, nil, errors.Wrapf(err, "failed to decode response body into error json for %s", httpResp.Request.URL)
}
return nil, nil, errorJson
}

View File

@@ -344,18 +344,4 @@ func Test_decodeResp(t *testing.T) {
err = decodeResp(r, nil)
assert.ErrorContains(t, "failed to decode response body into error json", err)
})
t.Run("500 not JSON", func(t *testing.T) {
body := bytes.Buffer{}
_, err := body.WriteString("foo")
require.NoError(t, err)
r := &http.Response{
Status: "500",
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {"text/plain"}},
Request: &http.Request{},
}
err = decodeResp(r, nil)
assert.ErrorContains(t, "HTTP request unsuccessful (500: foo)", err)
})
}