Compare commits

..

1 Commits

Author SHA1 Message Date
james-prysm
23167cc1c2 adding in debug flag 2025-06-24 11:05:50 -05:00
17 changed files with 175 additions and 1331 deletions

View File

@@ -213,7 +213,7 @@ func (s BlobSidecarsByRootReq) Len() int {
// ====================================
// DataColumnsByRootIdentifiers section
// ====================================
var _ ssz.Marshaler = DataColumnsByRootIdentifiers{}
var _ ssz.Marshaler = (*DataColumnsByRootIdentifiers)(nil)
var _ ssz.Unmarshaler = (*DataColumnsByRootIdentifiers)(nil)
// DataColumnsByRootIdentifiers is used to specify a list of data column targets (root+index) in a DataColumnSidecarsByRoot RPC request.
@@ -275,33 +275,33 @@ func (d *DataColumnsByRootIdentifiers) UnmarshalSSZ(buf []byte) error {
return nil
}
func (d DataColumnsByRootIdentifiers) MarshalSSZ() ([]byte, error) {
func (d *DataColumnsByRootIdentifiers) MarshalSSZ() ([]byte, error) {
var err error
count := len(d)
count := len(*d)
maxSize := params.BeaconConfig().MaxRequestBlocksDeneb
if uint64(count) > maxSize {
return nil, errors.Errorf("data column identifiers list exceeds max size: %d > %d", count, maxSize)
}
if len(d) == 0 {
if len(*d) == 0 {
return []byte{}, nil
}
sizes := make([]uint32, count)
valTotal := uint32(0)
for i, elem := range d {
for i, elem := range *d {
if elem == nil {
return nil, errors.New("nil item in DataColumnsByRootIdentifiers list")
}
sizes[i] = uint32(elem.SizeSSZ())
valTotal += sizes[i]
}
offSize := uint32(4 * len(d))
offSize := uint32(4 * len(*d))
out := make([]byte, offSize, offSize+valTotal)
for i := range sizes {
binary.LittleEndian.PutUint32(out[i*4:i*4+4], offSize)
offSize += sizes[i]
}
for _, elem := range d {
for _, elem := range *d {
out, err = elem.MarshalSSZTo(out)
if err != nil {
return nil, err
@@ -312,7 +312,7 @@ func (d DataColumnsByRootIdentifiers) MarshalSSZ() ([]byte, error) {
}
// MarshalSSZTo implements ssz.Marshaler. It appends the serialized DataColumnSidecarsByRootReq value to the provided byte slice.
func (d DataColumnsByRootIdentifiers) MarshalSSZTo(dst []byte) ([]byte, error) {
func (d *DataColumnsByRootIdentifiers) MarshalSSZTo(dst []byte) ([]byte, error) {
obj, err := d.MarshalSSZ()
if err != nil {
return nil, err
@@ -321,11 +321,11 @@ func (d DataColumnsByRootIdentifiers) MarshalSSZTo(dst []byte) ([]byte, error) {
}
// SizeSSZ implements ssz.Marshaler. It returns the size of the serialized representation.
func (d DataColumnsByRootIdentifiers) SizeSSZ() int {
func (d *DataColumnsByRootIdentifiers) SizeSSZ() int {
size := 0
for i := 0; i < len(d); i++ {
for i := 0; i < len(*d); i++ {
size += 4
size += (d)[i].SizeSSZ()
size += (*d)[i].SizeSSZ()
}
return size
}

View File

@@ -10,13 +10,11 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/rpc/core:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
@@ -38,9 +36,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/rpc/core:go_default_library",
@@ -49,7 +45,6 @@ go_test(
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/state/stategen/mock:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/primitives:go_default_library",

View File

@@ -3,15 +3,12 @@ package lookup
import (
"context"
"fmt"
"math"
"strconv"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
"github.com/OffchainLabs/prysm/v6/beacon-chain/rpc/core"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
@@ -52,7 +49,6 @@ type BeaconDbBlocker struct {
ChainInfoFetcher blockchain.ChainInfoFetcher
GenesisTimeFetcher blockchain.TimeFetcher
BlobStorage *filesystem.BlobStorage
DataColumnStorage *filesystem.DataColumnStorage
}
// Block returns the beacon block for a given identifier. The identifier can be one of:
@@ -216,190 +212,64 @@ func (p *BeaconDbBlocker) Blobs(ctx context.Context, id string, indices []int) (
root := bytesutil.ToBytes32(rootSlice)
roSignedBlock, err := p.BeaconDB.Block(ctx, root)
b, err := p.BeaconDB.Block(ctx, root)
if err != nil {
return nil, &core.RpcError{Err: errors.Wrapf(err, "failed to retrieve block %#x from db", rootSlice), Reason: core.Internal}
}
if roSignedBlock == nil {
if b == nil {
return nil, &core.RpcError{Err: fmt.Errorf("block %#x not found in db", rootSlice), Reason: core.NotFound}
}
// If block is not in the retention window, return 200 w/ empty list
if !p.BlobStorage.WithinRetentionPeriod(slots.ToEpoch(roSignedBlock.Block().Slot()), slots.ToEpoch(p.GenesisTimeFetcher.CurrentSlot())) {
// if block is not in the retention window, return 200 w/ empty list
if !p.BlobStorage.WithinRetentionPeriod(slots.ToEpoch(b.Block().Slot()), slots.ToEpoch(p.GenesisTimeFetcher.CurrentSlot())) {
return make([]*blocks.VerifiedROBlob, 0), nil
}
roBlock := roSignedBlock.Block()
commitments, err := roBlock.Body().BlobKzgCommitments()
commitments, err := b.Block().Body().BlobKzgCommitments()
if err != nil {
return nil, &core.RpcError{Err: errors.Wrapf(err, "failed to retrieve kzg commitments from block %#x", rootSlice), Reason: core.Internal}
}
// If there are no commitments return 200 w/ empty list
// if there are no commitments return 200 w/ empty list
if len(commitments) == 0 {
return make([]*blocks.VerifiedROBlob, 0), nil
}
// Compute the first Fulu slot.
fuluForkEpoch := params.BeaconConfig().FuluForkEpoch
fuluForkSlot := primitives.Slot(math.MaxUint64)
if fuluForkEpoch != primitives.Epoch(math.MaxUint64) {
fuluForkSlot, err = slots.EpochStart(fuluForkEpoch)
if err != nil {
return nil, &core.RpcError{Err: errors.Wrap(err, "could not calculate peerDAS start slot"), Reason: core.Internal}
}
}
sum := p.BlobStorage.Summary(root)
if roBlock.Slot() >= fuluForkSlot {
roBlock, err := blocks.NewROBlockWithRoot(roSignedBlock, root)
if err != nil {
return nil, &core.RpcError{Err: errors.Wrapf(err, "failed to create roBlock with root %#x", root), Reason: core.Internal}
}
return p.blobsFromStoredDataColumns(roBlock, indices)
}
return p.blobsFromStoredBlobs(commitments, root, indices)
}
// blobsFromStoredBlobs retrieves blob sidercars corresponding to `indices` and `root` from the store.
// This function expects blob sidecars to be stored (aka. no data column sidecars).
func (p *BeaconDbBlocker) blobsFromStoredBlobs(commitments [][]byte, root [fieldparams.RootLength]byte, indices []int) ([]*blocks.VerifiedROBlob, *core.RpcError) {
summary := p.BlobStorage.Summary(root)
maxBlobCount := summary.MaxBlobsForEpoch()
for _, index := range indices {
if uint64(index) >= maxBlobCount {
return nil, &core.RpcError{
Err: fmt.Errorf("requested index %d is bigger than the maximum possible blob count %d", index, maxBlobCount),
Reason: core.BadRequest,
}
}
if !summary.HasIndex(uint64(index)) {
return nil, &core.RpcError{
Err: fmt.Errorf("requested index %d not found", index),
Reason: core.NotFound,
}
}
}
// If no indices are provided, use all indices that are available in the summary.
if len(indices) == 0 {
for index := range commitments {
if summary.HasIndex(uint64(index)) {
indices = append(indices, index)
for i := range commitments {
if sum.HasIndex(uint64(i)) {
indices = append(indices, i)
}
}
} else {
for _, ix := range indices {
if uint64(ix) >= sum.MaxBlobsForEpoch() {
return nil, &core.RpcError{
Err: fmt.Errorf("requested index %d is bigger than the maximum possible blob count %d", ix, sum.MaxBlobsForEpoch()),
Reason: core.BadRequest,
}
}
if !sum.HasIndex(uint64(ix)) {
return nil, &core.RpcError{
Err: fmt.Errorf("requested index %d not found", ix),
Reason: core.NotFound,
}
}
}
}
// Retrieve blob sidecars from the store.
blobs := make([]*blocks.VerifiedROBlob, 0, len(indices))
for _, index := range indices {
blobSidecar, err := p.BlobStorage.Get(root, uint64(index))
blobs := make([]*blocks.VerifiedROBlob, len(indices))
for i, index := range indices {
vblob, err := p.BlobStorage.Get(root, uint64(index))
if err != nil {
return nil, &core.RpcError{
Err: fmt.Errorf("could not retrieve blob for block root %#x at index %d", root, index),
Err: fmt.Errorf("could not retrieve blob for block root %#x at index %d", rootSlice, index),
Reason: core.Internal,
}
}
blobs = append(blobs, &blobSidecar)
blobs[i] = &vblob
}
return blobs, nil
}
// blobsFromStoredDataColumns retrieves data column sidecars from the store,
// reconstructs the whole matrix if needed, converts the matrix to blobs,
// and then returns converted blobs corresponding to `indices` and `root`.
// This function expects data column sidecars to be stored (aka. no blob sidecars).
// If not enough data column sidecars are available to convert blobs from them
// (either directly or after reconstruction), an error is returned.
func (p *BeaconDbBlocker) blobsFromStoredDataColumns(block blocks.ROBlock, indices []int) ([]*blocks.VerifiedROBlob, *core.RpcError) {
root := block.Root()
// Use all indices if none are provided.
if len(indices) == 0 {
commitments, err := block.Block().Body().BlobKzgCommitments()
if err != nil {
return nil, &core.RpcError{
Err: errors.Wrap(err, "could not retrieve blob commitments"),
Reason: core.Internal,
}
}
for index := range commitments {
indices = append(indices, index)
}
}
// Count how many columns we have in the store.
summary := p.DataColumnStorage.Summary(root)
stored := summary.Stored()
count := uint64(len(stored))
if count < peerdas.MinimumColumnsCountToReconstruct() {
// There is no way to reconstruct the data columns.
return nil, &core.RpcError{
Err: errors.Errorf("the node does not custody enough data columns to reconstruct blobs - please start the beacon node with the `--%s` flag to ensure this call to succeed, or retry later if it is already the case", flags.SubscribeAllDataSubnets.Name),
Reason: core.NotFound,
}
}
// Retrieve from the database needed data columns.
verifiedRoDataColumnSidecars, err := p.neededDataColumnSidecars(root, stored)
if err != nil {
return nil, &core.RpcError{
Err: errors.Wrap(err, "needed data column sidecars"),
Reason: core.Internal,
}
}
// Reconstruct blob sidecars from data column sidecars.
verifiedRoBlobSidecars, err := peerdas.ReconstructBlobs(block, verifiedRoDataColumnSidecars, indices)
if err != nil {
return nil, &core.RpcError{
Err: errors.Wrap(err, "blobs from data columns"),
Reason: core.Internal,
}
}
return verifiedRoBlobSidecars, nil
}
// neededDataColumnSidecars retrieves all data column sidecars corresponding to (non extended) blobs if available,
// else retrieves all data column sidecars from the store.
func (p *BeaconDbBlocker) neededDataColumnSidecars(root [fieldparams.RootLength]byte, stored map[uint64]bool) ([]blocks.VerifiedRODataColumn, error) {
// Check if we have all the non-extended data columns.
cellsPerBlob := fieldparams.CellsPerBlob
blobIndices := make([]uint64, 0, cellsPerBlob)
hasAllBlobColumns := true
for i := range uint64(cellsPerBlob) {
if !stored[i] {
hasAllBlobColumns = false
break
}
blobIndices = append(blobIndices, i)
}
if hasAllBlobColumns {
// Retrieve only the non-extended data columns.
verifiedRoSidecars, err := p.DataColumnStorage.Get(root, blobIndices)
if err != nil {
return nil, errors.Wrap(err, "data columns storage get")
}
return verifiedRoSidecars, nil
}
// Retrieve all the data columns.
verifiedRoSidecars, err := p.DataColumnStorage.Get(root, nil)
if err != nil {
return nil, errors.Wrap(err, "data columns storage get")
}
return verifiedRoSidecars, nil
}

View File

@@ -8,15 +8,12 @@ import (
"testing"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/rpc/core"
"github.com/OffchainLabs/prysm/v6/beacon-chain/rpc/testutil"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
@@ -161,335 +158,172 @@ func TestGetBlock(t *testing.T) {
}
func TestGetBlob(t *testing.T) {
const (
slot = 123
blobCount = 4
denebForEpoch = 1
fuluForkEpoch = 2
)
setupDeneb := func(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig().Copy()
cfg.DenebForkEpoch = denebForEpoch
params.OverrideBeaconConfig(cfg)
}
setupFulu := func(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig().Copy()
cfg.DenebForkEpoch = denebForEpoch
cfg.FuluForkEpoch = fuluForkEpoch
params.OverrideBeaconConfig(cfg)
}
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig().Copy()
cfg.DenebForkEpoch = 1
params.OverrideBeaconConfig(cfg)
ctx := t.Context()
db := testDB.SetupDB(t)
// Start the trusted setup.
err := kzg.Start()
require.NoError(t, err)
// Create and save Deneb block and blob sidecars.
_, blobStorage := filesystem.NewEphemeralBlobStorageAndFs(t)
denebBlock, storedBlobSidecars := util.GenerateTestDenebBlockWithSidecar(t, [fieldparams.RootLength]byte{}, slot, blobCount)
denebBlockRoot := denebBlock.Root()
verifiedStoredSidecars := verification.FakeVerifySliceForTest(t, storedBlobSidecars)
for i := range verifiedStoredSidecars {
err := blobStorage.Save(verifiedStoredSidecars[i])
require.NoError(t, err)
denebBlock, blobs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 123, 4)
require.NoError(t, db.SaveBlock(t.Context(), denebBlock))
_, bs := filesystem.NewEphemeralBlobStorageAndFs(t)
testSidecars := verification.FakeVerifySliceForTest(t, blobs)
for i := range testSidecars {
require.NoError(t, bs.Save(testSidecars[i]))
}
err = db.SaveBlock(t.Context(), denebBlock)
require.NoError(t, err)
// Create Electra block and blob sidecars. (Electra block = Fulu block),
// save the block, convert blob sidecars to data column sidecars and save the block.
fuluForkSlot := fuluForkEpoch * params.BeaconConfig().SlotsPerEpoch
fuluBlock, fuluBlobSidecars := util.GenerateTestElectraBlockWithSidecar(t, [fieldparams.RootLength]byte{}, fuluForkSlot, blobCount)
fuluBlockRoot := fuluBlock.Root()
cellsAndProofsList := make([]kzg.CellsAndProofs, 0, len(fuluBlobSidecars))
for _, blob := range fuluBlobSidecars {
var kzgBlob kzg.Blob
copy(kzgBlob[:], blob.Blob)
cellsAndProogs, err := kzg.ComputeCellsAndKZGProofs(&kzgBlob)
require.NoError(t, err)
cellsAndProofsList = append(cellsAndProofsList, cellsAndProogs)
}
dataColumnSidecarPb, err := peerdas.DataColumnSidecars(fuluBlock, cellsAndProofsList)
require.NoError(t, err)
verifiedRoDataColumnSidecars := make([]blocks.VerifiedRODataColumn, 0, len(dataColumnSidecarPb))
for _, sidecarPb := range dataColumnSidecarPb {
roDataColumn, err := blocks.NewRODataColumnWithRoot(sidecarPb, fuluBlockRoot)
require.NoError(t, err)
verifiedRoDataColumn := blocks.NewVerifiedRODataColumn(roDataColumn)
verifiedRoDataColumnSidecars = append(verifiedRoDataColumnSidecars, verifiedRoDataColumn)
}
err = db.SaveBlock(t.Context(), fuluBlock)
require.NoError(t, err)
blockRoot := blobs[0].BlockRoot()
t.Run("genesis", func(t *testing.T) {
setupDeneb(t)
blocker := &BeaconDbBlocker{}
_, rpcErr := blocker.Blobs(ctx, "genesis", nil)
require.Equal(t, http.StatusBadRequest, core.ErrorReasonToHTTP(rpcErr.Reason))
require.StringContains(t, "blobs are not supported for Phase 0 fork", rpcErr.Err.Error())
assert.Equal(t, http.StatusBadRequest, core.ErrorReasonToHTTP(rpcErr.Reason))
assert.StringContains(t, "blobs are not supported for Phase 0 fork", rpcErr.Err.Error())
})
t.Run("head", func(t *testing.T) {
setupDeneb(t)
blocker := &BeaconDbBlocker{
ChainInfoFetcher: &mockChain.ChainService{Root: denebBlockRoot[:]},
ChainInfoFetcher: &mockChain.ChainService{Root: blockRoot[:]},
GenesisTimeFetcher: &testutil.MockGenesisTimeFetcher{
Genesis: time.Now(),
},
BeaconDB: db,
BlobStorage: blobStorage,
}
retrievedVerifiedSidecars, rpcErr := blocker.Blobs(ctx, "head", nil)
require.IsNil(t, rpcErr)
require.Equal(t, blobCount, len(retrievedVerifiedSidecars))
for i := range blobCount {
expected := verifiedStoredSidecars[i]
actual := retrievedVerifiedSidecars[i].BlobSidecar
require.NotNil(t, actual)
require.Equal(t, expected.Index, actual.Index)
require.DeepEqual(t, expected.Blob, actual.Blob)
require.DeepEqual(t, expected.KzgCommitment, actual.KzgCommitment)
require.DeepEqual(t, expected.KzgProof, actual.KzgProof)
BlobStorage: bs,
}
verifiedBlobs, rpcErr := blocker.Blobs(ctx, "head", nil)
assert.Equal(t, rpcErr == nil, true)
require.Equal(t, 4, len(verifiedBlobs))
sidecar := verifiedBlobs[0].BlobSidecar
require.NotNil(t, sidecar)
assert.Equal(t, uint64(0), sidecar.Index)
assert.DeepEqual(t, blobs[0].Blob, sidecar.Blob)
assert.DeepEqual(t, blobs[0].KzgCommitment, sidecar.KzgCommitment)
assert.DeepEqual(t, blobs[0].KzgProof, sidecar.KzgProof)
sidecar = verifiedBlobs[1].BlobSidecar
require.NotNil(t, sidecar)
assert.Equal(t, uint64(1), sidecar.Index)
assert.DeepEqual(t, blobs[1].Blob, sidecar.Blob)
assert.DeepEqual(t, blobs[1].KzgCommitment, sidecar.KzgCommitment)
assert.DeepEqual(t, blobs[1].KzgProof, sidecar.KzgProof)
sidecar = verifiedBlobs[2].BlobSidecar
require.NotNil(t, sidecar)
assert.Equal(t, uint64(2), sidecar.Index)
assert.DeepEqual(t, blobs[2].Blob, sidecar.Blob)
assert.DeepEqual(t, blobs[2].KzgCommitment, sidecar.KzgCommitment)
assert.DeepEqual(t, blobs[2].KzgProof, sidecar.KzgProof)
sidecar = verifiedBlobs[3].BlobSidecar
require.NotNil(t, sidecar)
assert.Equal(t, uint64(3), sidecar.Index)
assert.DeepEqual(t, blobs[3].Blob, sidecar.Blob)
assert.DeepEqual(t, blobs[3].KzgCommitment, sidecar.KzgCommitment)
assert.DeepEqual(t, blobs[3].KzgProof, sidecar.KzgProof)
})
t.Run("finalized", func(t *testing.T) {
setupDeneb(t)
blocker := &BeaconDbBlocker{
ChainInfoFetcher: &mockChain.ChainService{FinalizedCheckPoint: &ethpb.Checkpoint{Root: denebBlockRoot[:]}},
ChainInfoFetcher: &mockChain.ChainService{FinalizedCheckPoint: &ethpb.Checkpoint{Root: blockRoot[:]}},
GenesisTimeFetcher: &testutil.MockGenesisTimeFetcher{
Genesis: time.Now(),
},
BeaconDB: db,
BlobStorage: blobStorage,
BlobStorage: bs,
}
verifiedSidecars, rpcErr := blocker.Blobs(ctx, "finalized", nil)
require.IsNil(t, rpcErr)
require.Equal(t, blobCount, len(verifiedSidecars))
verifiedBlobs, rpcErr := blocker.Blobs(ctx, "finalized", nil)
assert.Equal(t, rpcErr == nil, true)
require.Equal(t, 4, len(verifiedBlobs))
})
t.Run("justified", func(t *testing.T) {
setupDeneb(t)
blocker := &BeaconDbBlocker{
ChainInfoFetcher: &mockChain.ChainService{CurrentJustifiedCheckPoint: &ethpb.Checkpoint{Root: denebBlockRoot[:]}},
ChainInfoFetcher: &mockChain.ChainService{CurrentJustifiedCheckPoint: &ethpb.Checkpoint{Root: blockRoot[:]}},
GenesisTimeFetcher: &testutil.MockGenesisTimeFetcher{
Genesis: time.Now(),
},
BeaconDB: db,
BlobStorage: blobStorage,
BlobStorage: bs,
}
verifiedSidecars, rpcErr := blocker.Blobs(ctx, "justified", nil)
require.IsNil(t, rpcErr)
require.Equal(t, blobCount, len(verifiedSidecars))
verifiedBlobs, rpcErr := blocker.Blobs(ctx, "justified", nil)
assert.Equal(t, rpcErr == nil, true)
require.Equal(t, 4, len(verifiedBlobs))
})
t.Run("root", func(t *testing.T) {
setupDeneb(t)
blocker := &BeaconDbBlocker{
GenesisTimeFetcher: &testutil.MockGenesisTimeFetcher{
Genesis: time.Now(),
},
BeaconDB: db,
BlobStorage: blobStorage,
BlobStorage: bs,
}
verifiedBlobs, rpcErr := blocker.Blobs(ctx, hexutil.Encode(denebBlockRoot[:]), nil)
require.IsNil(t, rpcErr)
require.Equal(t, blobCount, len(verifiedBlobs))
verifiedBlobs, rpcErr := blocker.Blobs(ctx, hexutil.Encode(blockRoot[:]), nil)
assert.Equal(t, rpcErr == nil, true)
require.Equal(t, 4, len(verifiedBlobs))
})
t.Run("slot", func(t *testing.T) {
setupDeneb(t)
blocker := &BeaconDbBlocker{
GenesisTimeFetcher: &testutil.MockGenesisTimeFetcher{
Genesis: time.Now(),
},
BeaconDB: db,
BlobStorage: blobStorage,
BlobStorage: bs,
}
verifiedBlobs, rpcErr := blocker.Blobs(ctx, "123", nil)
require.IsNil(t, rpcErr)
require.Equal(t, blobCount, len(verifiedBlobs))
assert.Equal(t, rpcErr == nil, true)
require.Equal(t, 4, len(verifiedBlobs))
})
t.Run("one blob only", func(t *testing.T) {
const index = 2
setupDeneb(t)
blocker := &BeaconDbBlocker{
ChainInfoFetcher: &mockChain.ChainService{FinalizedCheckPoint: &ethpb.Checkpoint{Root: denebBlockRoot[:]}},
ChainInfoFetcher: &mockChain.ChainService{FinalizedCheckPoint: &ethpb.Checkpoint{Root: blockRoot[:]}},
GenesisTimeFetcher: &testutil.MockGenesisTimeFetcher{
Genesis: time.Now(),
},
BeaconDB: db,
BlobStorage: blobStorage,
BlobStorage: bs,
}
retrievedVerifiedSidecars, rpcErr := blocker.Blobs(ctx, "123", []int{index})
require.IsNil(t, rpcErr)
require.Equal(t, 1, len(retrievedVerifiedSidecars))
expected := verifiedStoredSidecars[index]
actual := retrievedVerifiedSidecars[0].BlobSidecar
require.NotNil(t, actual)
require.Equal(t, uint64(index), actual.Index)
require.DeepEqual(t, expected.Blob, actual.Blob)
require.DeepEqual(t, expected.KzgCommitment, actual.KzgCommitment)
require.DeepEqual(t, expected.KzgProof, actual.KzgProof)
verifiedBlobs, rpcErr := blocker.Blobs(ctx, "123", []int{2})
assert.Equal(t, rpcErr == nil, true)
require.Equal(t, 1, len(verifiedBlobs))
sidecar := verifiedBlobs[0].BlobSidecar
require.NotNil(t, sidecar)
assert.Equal(t, uint64(2), sidecar.Index)
assert.DeepEqual(t, blobs[2].Blob, sidecar.Blob)
assert.DeepEqual(t, blobs[2].KzgCommitment, sidecar.KzgCommitment)
assert.DeepEqual(t, blobs[2].KzgProof, sidecar.KzgProof)
})
t.Run("no blobs returns an empty array", func(t *testing.T) {
setupDeneb(t)
blocker := &BeaconDbBlocker{
ChainInfoFetcher: &mockChain.ChainService{FinalizedCheckPoint: &ethpb.Checkpoint{Root: denebBlockRoot[:]}},
ChainInfoFetcher: &mockChain.ChainService{FinalizedCheckPoint: &ethpb.Checkpoint{Root: blockRoot[:]}},
GenesisTimeFetcher: &testutil.MockGenesisTimeFetcher{
Genesis: time.Now(),
},
BeaconDB: db,
BlobStorage: filesystem.NewEphemeralBlobStorage(t),
}
verifiedBlobs, rpcErr := blocker.Blobs(ctx, "123", nil)
require.IsNil(t, rpcErr)
assert.Equal(t, rpcErr == nil, true)
require.Equal(t, 0, len(verifiedBlobs))
})
t.Run("no blob at index", func(t *testing.T) {
setupDeneb(t)
blocker := &BeaconDbBlocker{
ChainInfoFetcher: &mockChain.ChainService{FinalizedCheckPoint: &ethpb.Checkpoint{Root: denebBlockRoot[:]}},
ChainInfoFetcher: &mockChain.ChainService{FinalizedCheckPoint: &ethpb.Checkpoint{Root: blockRoot[:]}},
GenesisTimeFetcher: &testutil.MockGenesisTimeFetcher{
Genesis: time.Now(),
},
BeaconDB: db,
BlobStorage: blobStorage,
BlobStorage: bs,
}
noBlobIndex := len(storedBlobSidecars) + 1
noBlobIndex := len(blobs) + 1
_, rpcErr := blocker.Blobs(ctx, "123", []int{0, noBlobIndex})
require.NotNil(t, rpcErr)
require.Equal(t, core.ErrorReason(core.NotFound), rpcErr.Reason)
assert.Equal(t, core.ErrorReason(core.NotFound), rpcErr.Reason)
})
t.Run("index too big", func(t *testing.T) {
setupDeneb(t)
blocker := &BeaconDbBlocker{
ChainInfoFetcher: &mockChain.ChainService{FinalizedCheckPoint: &ethpb.Checkpoint{Root: denebBlockRoot[:]}},
ChainInfoFetcher: &mockChain.ChainService{FinalizedCheckPoint: &ethpb.Checkpoint{Root: blockRoot[:]}},
GenesisTimeFetcher: &testutil.MockGenesisTimeFetcher{
Genesis: time.Now(),
},
BeaconDB: db,
BlobStorage: blobStorage,
BlobStorage: bs,
}
_, rpcErr := blocker.Blobs(ctx, "123", []int{0, math.MaxInt})
require.NotNil(t, rpcErr)
require.Equal(t, core.ErrorReason(core.BadRequest), rpcErr.Reason)
})
t.Run("not enough stored data column sidecars", func(t *testing.T) {
setupFulu(t)
_, dataColumnStorage := filesystem.NewEphemeralDataColumnStorageAndFs(t)
err = dataColumnStorage.Save(verifiedRoDataColumnSidecars[:fieldparams.CellsPerBlob-1])
require.NoError(t, err)
blocker := &BeaconDbBlocker{
GenesisTimeFetcher: &testutil.MockGenesisTimeFetcher{
Genesis: time.Now(),
},
BeaconDB: db,
BlobStorage: blobStorage,
DataColumnStorage: dataColumnStorage,
}
_, rpcErr := blocker.Blobs(ctx, hexutil.Encode(fuluBlockRoot[:]), nil)
require.NotNil(t, rpcErr)
require.Equal(t, core.ErrorReason(core.NotFound), rpcErr.Reason)
})
t.Run("reconstruction needed", func(t *testing.T) {
setupFulu(t)
_, dataColumnStorage := filesystem.NewEphemeralDataColumnStorageAndFs(t)
err = dataColumnStorage.Save(verifiedRoDataColumnSidecars[1 : peerdas.MinimumColumnsCountToReconstruct()+1])
require.NoError(t, err)
blocker := &BeaconDbBlocker{
GenesisTimeFetcher: &testutil.MockGenesisTimeFetcher{
Genesis: time.Now(),
},
BeaconDB: db,
BlobStorage: blobStorage,
DataColumnStorage: dataColumnStorage,
}
retrievedVerifiedRoBlobs, rpcErr := blocker.Blobs(ctx, hexutil.Encode(fuluBlockRoot[:]), nil)
require.IsNil(t, rpcErr)
require.Equal(t, len(fuluBlobSidecars), len(retrievedVerifiedRoBlobs))
for i, retrievedVerifiedRoBlob := range retrievedVerifiedRoBlobs {
retrievedBlobSidecarPb := retrievedVerifiedRoBlob.BlobSidecar
initialBlobSidecarPb := fuluBlobSidecars[i].BlobSidecar
require.DeepSSZEqual(t, initialBlobSidecarPb, retrievedBlobSidecarPb)
}
})
t.Run("no reconstruction needed", func(t *testing.T) {
setupFulu(t)
_, dataColumnStorage := filesystem.NewEphemeralDataColumnStorageAndFs(t)
err = dataColumnStorage.Save(verifiedRoDataColumnSidecars)
require.NoError(t, err)
blocker := &BeaconDbBlocker{
GenesisTimeFetcher: &testutil.MockGenesisTimeFetcher{
Genesis: time.Now(),
},
BeaconDB: db,
BlobStorage: blobStorage,
DataColumnStorage: dataColumnStorage,
}
retrievedVerifiedRoBlobs, rpcErr := blocker.Blobs(ctx, hexutil.Encode(fuluBlockRoot[:]), nil)
require.IsNil(t, rpcErr)
require.Equal(t, len(fuluBlobSidecars), len(retrievedVerifiedRoBlobs))
for i, retrievedVerifiedRoBlob := range retrievedVerifiedRoBlobs {
retrievedBlobSidecarPb := retrievedVerifiedRoBlob.BlobSidecar
initialBlobSidecarPb := fuluBlobSidecars[i].BlobSidecar
require.DeepSSZEqual(t, initialBlobSidecarPb, retrievedBlobSidecarPb)
}
assert.Equal(t, core.ErrorReason(core.BadRequest), rpcErr.Reason)
})
}

View File

@@ -13,7 +13,7 @@ import (
const signatureVerificationInterval = 50 * time.Millisecond
const verifierLimit = 1000
const verifierLimit = 50
type signatureVerifier struct {
set *bls.SignatureBatch

View File

@@ -12,7 +12,6 @@ import (
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
"github.com/OffchainLabs/prysm/v6/time/slots"
@@ -99,15 +98,12 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
log.Debug("Serving data column sidecar by root request")
count := 0
for _, ident := range requestedColumnIdents {
for root, columns := range requestedColumnsByRoot {
if err := ctx.Err(); err != nil {
closeStream(stream, log)
return errors.Wrap(err, "context error")
}
root := bytesutil.ToBytes32(ident.BlockRoot)
columns := ident.Columns
// Throttle request processing to no more than batchSize/sec.
for range columns {
if ticker != nil && count != 0 && count%batchSize == 0 {

View File

@@ -4,14 +4,12 @@ import (
"context"
"fmt"
"io"
"slices"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
p2ptypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
@@ -32,18 +30,16 @@ var errBlobUnmarshal = errors.New("Could not unmarshal chunk-encoded blob")
// Any error from the following declaration block should result in peer downscoring.
var (
// ErrInvalidFetchedData is used to signal that an error occurred which should result in peer downscoring.
ErrInvalidFetchedData = errors.New("invalid data returned from peer")
errBlobIndexOutOfBounds = errors.Wrap(verification.ErrBlobInvalid, "blob index out of range")
errMaxRequestBlobSidecarsExceeded = errors.Wrap(verification.ErrBlobInvalid, "peer exceeded req blob chunk tx limit")
errChunkResponseSlotNotAsc = errors.Wrap(verification.ErrBlobInvalid, "blob slot not higher than previous block root")
errChunkResponseIndexNotAsc = errors.Wrap(verification.ErrBlobInvalid, "blob indices for a block must start at 0 and increase by 1")
errUnrequested = errors.Wrap(verification.ErrBlobInvalid, "received BlobSidecar in response that was not requested")
errBlobResponseOutOfBounds = errors.Wrap(verification.ErrBlobInvalid, "received BlobSidecar with slot outside BlobSidecarsByRangeRequest bounds")
errChunkResponseBlockMismatch = errors.Wrap(verification.ErrBlobInvalid, "blob block details do not match")
errChunkResponseParentMismatch = errors.Wrap(verification.ErrBlobInvalid, "parent root for response element doesn't match previous element root")
errDataColumnChunkedReadFailure = errors.New("failed to read stream of chunk-encoded data columns")
errMaxRequestDataColumnSidecarsExceeded = errors.New("count of requested data column sidecars exceeds MAX_REQUEST_DATA_COLUMN_SIDECARS")
errMaxResponseDataColumnSidecarsExceeded = errors.New("peer returned more data column sidecars than requested")
ErrInvalidFetchedData = errors.New("invalid data returned from peer")
errBlobIndexOutOfBounds = errors.Wrap(verification.ErrBlobInvalid, "blob index out of range")
errMaxRequestBlobSidecarsExceeded = errors.Wrap(verification.ErrBlobInvalid, "peer exceeded req blob chunk tx limit")
errChunkResponseSlotNotAsc = errors.Wrap(verification.ErrBlobInvalid, "blob slot not higher than previous block root")
errChunkResponseIndexNotAsc = errors.Wrap(verification.ErrBlobInvalid, "blob indices for a block must start at 0 and increase by 1")
errUnrequested = errors.Wrap(verification.ErrBlobInvalid, "received BlobSidecar in response that was not requested")
errBlobResponseOutOfBounds = errors.Wrap(verification.ErrBlobInvalid, "received BlobSidecar with slot outside BlobSidecarsByRangeRequest bounds")
errChunkResponseBlockMismatch = errors.Wrap(verification.ErrBlobInvalid, "blob block details do not match")
errChunkResponseParentMismatch = errors.Wrap(verification.ErrBlobInvalid, "parent root for response element doesn't match previous element root")
errDataColumnChunkedReadFailure = errors.New("failed to read stream of chunk-encoded data columns")
)
// ------
@@ -401,245 +397,6 @@ func readChunkedBlobSidecar(stream network.Stream, encoding encoder.NetworkEncod
// Data column sidecars
// --------------------
// SendDataColumnSidecarsByRangeRequest sends a request for data column sidecars by range
// and returns the fetched data column sidecars.
func SendDataColumnSidecarsByRangeRequest(
ctx context.Context,
tor blockchain.TemporalOracle,
p2pApi p2p.P2P,
pid peer.ID,
ctxMap ContextByteVersions,
request *ethpb.DataColumnSidecarsByRangeRequest,
) ([]blocks.RODataColumn, error) {
// Return early if nothing to request.
if request == nil || request.Count == 0 || len(request.Columns) == 0 {
return nil, nil
}
beaconConfig := params.BeaconConfig()
numberOfColumns := beaconConfig.NumberOfColumns
maxRequestDataColumnSidecars := params.BeaconConfig().MaxRequestDataColumnSidecars
// Check if we do not request too many sidecars.
columnsCount := uint64(len(request.Columns))
totalCount := request.Count * columnsCount
if totalCount > maxRequestDataColumnSidecars {
return nil, errors.Wrapf(errMaxRequestDataColumnSidecarsExceeded, "requestedCount=%d, allowedCount=%d", totalCount, maxRequestDataColumnSidecars)
}
// Build the topic.
currentSlot := tor.CurrentSlot()
currentEpoch := slots.ToEpoch(currentSlot)
topic, err := p2p.TopicFromMessage(p2p.DataColumnSidecarsByRangeName, currentEpoch)
if err != nil {
return nil, errors.Wrap(err, "topic from message")
}
// Build the logs.
var columnsLog interface{} = "all"
if columnsCount < numberOfColumns {
columns := request.Columns
slices.Sort(columns)
columnsLog = columns
}
log := log.WithFields(logrus.Fields{
"peer": pid,
"topic": topic,
"startSlot": request.StartSlot,
"count": request.Count,
"columns": columnsLog,
"totalCount": totalCount,
})
// Send the request.
stream, err := p2pApi.Send(ctx, request, topic, pid)
if err != nil {
return nil, errors.Wrap(err, "p2p send")
}
defer closeStream(stream, log)
// Read the data column sidecars from the stream.
roDataColumns := make([]blocks.RODataColumn, 0, totalCount)
for range totalCount {
// Avoid reading extra chunks if the context is done.
if err := ctx.Err(); err != nil {
return nil, err
}
validatorSlotWithinBounds, err := isSidecarSlotWithinBounds(request)
if err != nil {
return nil, errors.Wrap(err, "is sidecar slot within bounds")
}
roDataColumn, err := readChunkedDataColumnSidecar(
stream, p2pApi, ctxMap,
validatorSlotWithinBounds,
isSidecarIndexRequested(request),
)
if errors.Is(err, io.EOF) {
return roDataColumns, nil
}
if err != nil {
return nil, errors.Wrap(err, "read chunked data column sidecar")
}
if roDataColumn == nil {
return nil, errors.New("nil data column sidecar, should never happen")
}
roDataColumns = append(roDataColumns, *roDataColumn)
}
// All requested sidecars were delivered by the peer. Expecting EOF.
if _, err := readChunkedDataColumnSidecar(stream, p2pApi, ctxMap); !errors.Is(err, io.EOF) {
return nil, errors.Wrapf(errMaxResponseDataColumnSidecarsExceeded, "requestedCount=%d", totalCount)
}
return roDataColumns, nil
}
// isSidecarSlotWithinBounds verifies that the slot of the data column sidecar is within the bounds of the request.
func isSidecarSlotWithinBounds(request *ethpb.DataColumnSidecarsByRangeRequest) (DataColumnResponseValidation, error) {
// endSlot is exclusive (while request.StartSlot is inclusive).
endSlot, err := request.StartSlot.SafeAdd(request.Count)
if err != nil {
return nil, errors.Wrap(err, "calculate end slot")
}
validator := func(sidecar blocks.RODataColumn) error {
slot := sidecar.Slot()
if !(request.StartSlot <= slot && slot < endSlot) {
return errors.Errorf("data column sidecar slot %d out of range [%d, %d[", slot, request.StartSlot, endSlot)
}
return nil
}
return validator, nil
}
// isSidecarIndexRequested verifies that the index of the data column sidecar is found in the requested indices.
func isSidecarIndexRequested(request *ethpb.DataColumnSidecarsByRangeRequest) DataColumnResponseValidation {
requestedIndices := make(map[uint64]bool)
for _, col := range request.Columns {
requestedIndices[col] = true
}
return func(sidecar blocks.RODataColumn) error {
columnIndex := sidecar.Index
if !requestedIndices[columnIndex] {
return errors.Errorf("data column sidecar index %d not found in requested indices", columnIndex)
}
return nil
}
}
// SendDataColumnSidecarsByRootRequest sends a request for data column sidecars by root
// and returns the fetched data column sidecars.
func SendDataColumnSidecarsByRootRequest(
ctx context.Context,
tor blockchain.TemporalOracle,
p2pApi p2p.P2P,
pid peer.ID,
ctxMap ContextByteVersions,
request p2ptypes.DataColumnsByRootIdentifiers,
) ([]blocks.RODataColumn, error) {
// Return early if the request is nil.
if request == nil {
return nil, nil
}
// Compute how many sidecars are requested.
count := uint64(0)
for _, identifier := range request {
count += uint64(len(identifier.Columns))
}
// Return early if nothing to request.
if count == 0 {
return nil, nil
}
// Verify that the request count is within the maximum allowed.
maxRequestDataColumnSidecars := params.BeaconConfig().MaxRequestDataColumnSidecars
if count > maxRequestDataColumnSidecars {
return nil, errors.Wrapf(errMaxRequestDataColumnSidecarsExceeded, "current: %d, max: %d", count, maxRequestDataColumnSidecars)
}
// Get the topic for the request.
topic, err := p2p.TopicFromMessage(p2p.DataColumnSidecarsByRootName, slots.ToEpoch(tor.CurrentSlot()))
if err != nil {
return nil, errors.Wrap(err, "topic from message")
}
// Send the request to the peer.
stream, err := p2pApi.Send(ctx, request, topic, pid)
if err != nil {
return nil, errors.Wrap(err, "p2p api send")
}
defer closeStream(stream, log)
// Read the data column sidecars from the stream.
roDataColumns := make([]blocks.RODataColumn, 0, count)
// Read the data column sidecars from the stream.
for range count {
roDataColumn, err := readChunkedDataColumnSidecar(stream, p2pApi, ctxMap, isSidecarIndexRootRequested(request))
if errors.Is(err, io.EOF) {
return roDataColumns, nil
}
if err != nil {
return nil, errors.Wrap(err, "read chunked data column sidecar")
}
if roDataColumn == nil {
return nil, errors.Wrap(err, "nil data column sidecar, should never happen")
}
roDataColumns = append(roDataColumns, *roDataColumn)
}
// All requested sidecars were delivered by the peer. Expecting EOF.
if _, err := readChunkedDataColumnSidecar(stream, p2pApi, ctxMap); !errors.Is(err, io.EOF) {
return nil, errors.Wrapf(errMaxResponseDataColumnSidecarsExceeded, "requestedCount=%d", count)
}
return roDataColumns, nil
}
func isSidecarIndexRootRequested(request p2ptypes.DataColumnsByRootIdentifiers) DataColumnResponseValidation {
columnsIndexFromRoot := make(map[[fieldparams.RootLength]byte]map[uint64]bool)
for _, sidecar := range request {
blockRoot := bytesutil.ToBytes32(sidecar.BlockRoot)
if columnsIndexFromRoot[blockRoot] == nil {
columnsIndexFromRoot[blockRoot] = make(map[uint64]bool)
}
for _, column := range sidecar.Columns {
columnsIndexFromRoot[blockRoot][column] = true
}
}
return func(sidecar blocks.RODataColumn) error {
root, index := sidecar.BlockRoot(), sidecar.Index
indices, ok := columnsIndexFromRoot[root]
if !ok {
return errors.Errorf("root #%x returned by peer but not requested", root)
}
if !indices[index] {
return errors.Errorf("index %d for root #%x returned by peer but not requested", index, root)
}
return nil
}
}
// DataColumnResponseValidation represents a function that can validate aspects of a single unmarshaled data column sidecar
// that was received from a peer in response to an rpc request.
type DataColumnResponseValidation func(column blocks.RODataColumn) error

View File

@@ -11,9 +11,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
p2pTypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
p2ptypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
@@ -887,544 +885,6 @@ func TestErrInvalidFetchedDataDistinction(t *testing.T) {
require.Equal(t, false, errors.Is(ErrInvalidFetchedData, verification.ErrBlobInvalid))
}
func TestSendDataColumnSidecarsByRangeRequest(t *testing.T) {
nilTestCases := []struct {
name string
request *ethpb.DataColumnSidecarsByRangeRequest
}{
{
name: "nil request",
request: nil,
},
{
name: "count is 0",
request: &ethpb.DataColumnSidecarsByRangeRequest{},
},
{
name: "columns is nil",
request: &ethpb.DataColumnSidecarsByRangeRequest{Count: 1},
},
}
for _, tc := range nilTestCases {
t.Run(tc.name, func(t *testing.T) {
actual, err := SendDataColumnSidecarsByRangeRequest(t.Context(), nil, nil, "aRandomPID", nil, tc.request)
require.NoError(t, err)
require.IsNil(t, actual)
})
}
t.Run("too many columns in request", func(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.MaxRequestDataColumnSidecars = 0
params.OverrideBeaconConfig(beaconConfig)
request := &ethpb.DataColumnSidecarsByRangeRequest{Count: 1, Columns: []uint64{1, 2, 3}}
_, err := SendDataColumnSidecarsByRangeRequest(t.Context(), nil, nil, "aRandomPID", nil, request)
require.ErrorContains(t, errMaxRequestDataColumnSidecarsExceeded.Error(), err)
})
type slotIndex struct {
Slot primitives.Slot
Index uint64
}
createSidecar := func(slotIndex slotIndex) *ethpb.DataColumnSidecar {
const count = 4
kzgCommitmentsInclusionProof := make([][]byte, 0, count)
for range count {
kzgCommitmentsInclusionProof = append(kzgCommitmentsInclusionProof, make([]byte, 32))
}
return &ethpb.DataColumnSidecar{
Index: slotIndex.Index,
SignedBlockHeader: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
Slot: slotIndex.Slot,
ParentRoot: make([]byte, fieldparams.RootLength),
StateRoot: make([]byte, fieldparams.RootLength),
BodyRoot: make([]byte, fieldparams.RootLength),
},
Signature: make([]byte, fieldparams.BLSSignatureLength),
},
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
}
}
testCases := []struct {
name string
slotIndices []slotIndex
expectedError error
}{
{
name: "too many responses",
slotIndices: []slotIndex{
{Slot: 0, Index: 1},
{Slot: 0, Index: 2},
{Slot: 0, Index: 3},
{Slot: 1, Index: 1},
{Slot: 1, Index: 2},
{Slot: 1, Index: 3},
{Slot: 0, Index: 3}, // Duplicate
},
expectedError: errMaxResponseDataColumnSidecarsExceeded,
},
{
name: "perfect match",
slotIndices: []slotIndex{
{Slot: 0, Index: 1},
{Slot: 0, Index: 2},
{Slot: 0, Index: 3},
{Slot: 1, Index: 1},
{Slot: 1, Index: 2},
{Slot: 1, Index: 3},
},
},
{
name: "few responses than maximum possible",
slotIndices: []slotIndex{
{Slot: 0, Index: 1},
{Slot: 0, Index: 2},
{Slot: 0, Index: 3},
{Slot: 1, Index: 1},
{Slot: 1, Index: 2},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
protocol := fmt.Sprintf("%s/ssz_snappy", p2p.RPCDataColumnSidecarsByRangeTopicV1)
clock := startup.NewClock(time.Now(), [fieldparams.RootLength]byte{})
p1, p2 := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t)
p1.Connect(p2)
expected := make([]*ethpb.DataColumnSidecar, 0, len(tc.slotIndices))
for _, slotIndex := range tc.slotIndices {
sidecar := createSidecar(slotIndex)
expected = append(expected, sidecar)
}
requestSent := &ethpb.DataColumnSidecarsByRangeRequest{
StartSlot: 0,
Count: 2,
Columns: []uint64{1, 3, 2},
}
var wg sync.WaitGroup
wg.Add(1)
p2.SetStreamHandler(protocol, func(stream network.Stream) {
wg.Done()
requestReceived := new(ethpb.DataColumnSidecarsByRangeRequest)
err := p2.Encoding().DecodeWithMaxLength(stream, requestReceived)
assert.NoError(t, err)
assert.DeepSSZEqual(t, requestSent, requestReceived)
for _, sidecar := range expected {
err := WriteDataColumnSidecarChunk(stream, clock, p2.Encoding(), sidecar)
assert.NoError(t, err)
}
err = stream.CloseWrite()
assert.NoError(t, err)
})
ctx := t.Context()
ctxMap := ContextByteVersions{[4]byte{245, 165, 253, 66}: version.Fulu}
actual, err := SendDataColumnSidecarsByRangeRequest(ctx, clock, p1, p2.PeerID(), ctxMap, requestSent)
if tc.expectedError != nil {
require.ErrorContains(t, tc.expectedError.Error(), err)
if util.WaitTimeout(&wg, time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
return
}
require.Equal(t, len(expected), len(actual))
for i := range expected {
require.DeepSSZEqual(t, expected[i], actual[i].DataColumnSidecar)
}
})
}
}
func TestIsSidecarSlotWithinBounds(t *testing.T) {
request := &ethpb.DataColumnSidecarsByRangeRequest{
StartSlot: 10,
Count: 10,
}
validator, err := isSidecarSlotWithinBounds(request)
require.NoError(t, err)
testCases := []struct {
name string
slot primitives.Slot
isErrorExpected bool
}{
{
name: "too soon",
slot: 9,
isErrorExpected: true,
},
{
name: "too late",
slot: 20,
isErrorExpected: true,
},
{
name: "within bounds",
slot: 15,
isErrorExpected: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
const count = 4
kzgCommitmentsInclusionProof := make([][]byte, 0, count)
for range count {
kzgCommitmentsInclusionProof = append(kzgCommitmentsInclusionProof, make([]byte, 32))
}
sidecarPb := &ethpb.DataColumnSidecar{
SignedBlockHeader: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
Slot: tc.slot,
ParentRoot: make([]byte, fieldparams.RootLength),
StateRoot: make([]byte, fieldparams.RootLength),
BodyRoot: make([]byte, fieldparams.RootLength),
},
Signature: make([]byte, fieldparams.BLSSignatureLength),
},
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
}
sidecar, err := blocks.NewRODataColumn(sidecarPb)
require.NoError(t, err)
err = validator(sidecar)
if tc.isErrorExpected {
require.NotNil(t, err)
return
}
require.NoError(t, err)
})
}
}
func TestIsSidecarIndexRequested(t *testing.T) {
request := &ethpb.DataColumnSidecarsByRangeRequest{
Columns: []uint64{2, 9, 4},
}
validator := isSidecarIndexRequested(request)
testCases := []struct {
name string
index uint64
isErrorExpected bool
}{
{
name: "not requested",
index: 1,
isErrorExpected: true,
},
{
name: "requested",
index: 9,
isErrorExpected: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
const count = 4
kzgCommitmentsInclusionProof := make([][]byte, 0, count)
for range count {
kzgCommitmentsInclusionProof = append(kzgCommitmentsInclusionProof, make([]byte, 32))
}
sidecarPb := &ethpb.DataColumnSidecar{
SignedBlockHeader: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
Slot: 0,
ParentRoot: make([]byte, fieldparams.RootLength),
StateRoot: make([]byte, fieldparams.RootLength),
BodyRoot: make([]byte, fieldparams.RootLength),
},
Signature: make([]byte, fieldparams.BLSSignatureLength),
},
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
Index: tc.index,
}
sidecar, err := blocks.NewRODataColumn(sidecarPb)
require.NoError(t, err)
err = validator(sidecar)
if tc.isErrorExpected {
require.NotNil(t, err)
return
}
require.NoError(t, err)
})
}
}
func TestSendDataColumnSidecarsByRootRequest(t *testing.T) {
nilTestCases := []struct {
name string
request p2ptypes.DataColumnsByRootIdentifiers
}{
{
name: "nil request",
request: nil,
},
{
name: "count is 0",
request: p2ptypes.DataColumnsByRootIdentifiers{{}, {}},
},
}
for _, tc := range nilTestCases {
t.Run(tc.name, func(t *testing.T) {
actual, err := SendDataColumnSidecarsByRootRequest(t.Context(), nil, nil, "aRandomPID", nil, tc.request)
require.NoError(t, err)
require.IsNil(t, actual)
})
}
t.Run("too many columns in request", func(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.MaxRequestDataColumnSidecars = 4
params.OverrideBeaconConfig(beaconConfig)
request := p2ptypes.DataColumnsByRootIdentifiers{
{Columns: []uint64{1, 2, 3}},
{Columns: []uint64{4, 5, 6}},
}
_, err := SendDataColumnSidecarsByRootRequest(t.Context(), nil, nil, "aRandomPID", nil, request)
require.ErrorContains(t, errMaxRequestDataColumnSidecarsExceeded.Error(), err)
})
type slotIndex struct {
Slot primitives.Slot
Index uint64
}
createSidecar := func(rootIndex slotIndex) blocks.RODataColumn {
const count = 4
kzgCommitmentsInclusionProof := make([][]byte, 0, count)
for range count {
kzgCommitmentsInclusionProof = append(kzgCommitmentsInclusionProof, make([]byte, 32))
}
sidecarPb := &ethpb.DataColumnSidecar{
Index: rootIndex.Index,
SignedBlockHeader: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
ParentRoot: make([]byte, fieldparams.RootLength),
StateRoot: make([]byte, fieldparams.RootLength),
BodyRoot: make([]byte, fieldparams.RootLength),
},
Signature: make([]byte, fieldparams.BLSSignatureLength),
},
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
}
roSidecar, err := blocks.NewRODataColumn(sidecarPb)
require.NoError(t, err)
return roSidecar
}
testCases := []struct {
name string
slotIndices []slotIndex
expectedError error
}{
{
name: "too many responses",
slotIndices: []slotIndex{
{Slot: 1, Index: 1},
{Slot: 1, Index: 2},
{Slot: 1, Index: 3},
{Slot: 2, Index: 1},
{Slot: 2, Index: 2},
{Slot: 2, Index: 3},
{Slot: 1, Index: 3}, // Duplicate
},
expectedError: errMaxResponseDataColumnSidecarsExceeded,
},
{
name: "perfect match",
slotIndices: []slotIndex{
{Slot: 1, Index: 1},
{Slot: 1, Index: 2},
{Slot: 1, Index: 3},
{Slot: 2, Index: 1},
{Slot: 2, Index: 2},
{Slot: 2, Index: 3},
},
},
{
name: "few responses than maximum possible",
slotIndices: []slotIndex{
{Slot: 1, Index: 1},
{Slot: 1, Index: 2},
{Slot: 1, Index: 3},
{Slot: 2, Index: 1},
{Slot: 2, Index: 2},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
protocol := fmt.Sprintf("%s/ssz_snappy", p2p.RPCDataColumnSidecarsByRootTopicV1)
clock := startup.NewClock(time.Now(), [fieldparams.RootLength]byte{})
p1, p2 := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t)
p1.Connect(p2)
expected := make([]blocks.RODataColumn, 0, len(tc.slotIndices))
for _, slotIndex := range tc.slotIndices {
roSidecar := createSidecar(slotIndex)
expected = append(expected, roSidecar)
}
blockRoot1, blockRoot2 := expected[0].BlockRoot(), expected[3].BlockRoot()
sentRequest := p2ptypes.DataColumnsByRootIdentifiers{
{BlockRoot: blockRoot1[:], Columns: []uint64{1, 2, 3}},
{BlockRoot: blockRoot2[:], Columns: []uint64{1, 2, 3}},
}
var wg sync.WaitGroup
wg.Add(1)
p2.SetStreamHandler(protocol, func(stream network.Stream) {
wg.Done()
requestReceived := new(p2ptypes.DataColumnsByRootIdentifiers)
err := p2.Encoding().DecodeWithMaxLength(stream, requestReceived)
assert.NoError(t, err)
require.Equal(t, len(sentRequest), len(*requestReceived))
for i := range sentRequest {
require.DeepSSZEqual(t, (sentRequest)[i], (*requestReceived)[i])
}
for _, sidecar := range expected {
err := WriteDataColumnSidecarChunk(stream, clock, p2.Encoding(), sidecar.DataColumnSidecar)
assert.NoError(t, err)
}
err = stream.CloseWrite()
assert.NoError(t, err)
})
ctx := t.Context()
ctxMap := ContextByteVersions{[4]byte{245, 165, 253, 66}: version.Fulu}
actual, err := SendDataColumnSidecarsByRootRequest(ctx, clock, p1, p2.PeerID(), ctxMap, sentRequest)
if tc.expectedError != nil {
require.ErrorContains(t, tc.expectedError.Error(), err)
if util.WaitTimeout(&wg, time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
return
}
require.Equal(t, len(expected), len(actual))
for i := range expected {
require.DeepSSZEqual(t, expected[i], actual[i])
}
})
}
}
func TestIsSidecarIndexRootRequested(t *testing.T) {
testCases := []struct {
name string
root [fieldparams.RootLength]byte
index uint64
isErrorExpected bool
}{
{
name: "non requested root",
root: [fieldparams.RootLength]byte{2},
isErrorExpected: true,
},
{
name: "non requested index",
root: [fieldparams.RootLength]byte{1},
index: 3,
isErrorExpected: true,
},
{
name: "nominal",
root: [fieldparams.RootLength]byte{1},
index: 2,
isErrorExpected: false,
},
}
request := types.DataColumnsByRootIdentifiers{
{BlockRoot: []byte{1}, Columns: []uint64{1, 2}},
}
validator := isSidecarIndexRootRequested(request)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
const count = 4
kzgCommitmentsInclusionProof := make([][]byte, 0, count)
for range count {
kzgCommitmentsInclusionProof = append(kzgCommitmentsInclusionProof, make([]byte, 32))
}
sidecarPb := &ethpb.DataColumnSidecar{
SignedBlockHeader: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
ParentRoot: make([]byte, fieldparams.RootLength),
StateRoot: make([]byte, fieldparams.RootLength),
BodyRoot: make([]byte, fieldparams.RootLength),
},
Signature: make([]byte, fieldparams.BLSSignatureLength),
},
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
Index: tc.index,
}
// There is a discrepancy between `tc.root` and the real root,
// but we don't care about it here.
sidecar, err := blocks.NewRODataColumnWithRoot(sidecarPb, tc.root)
require.NoError(t, err)
err = validator(sidecar)
if tc.isErrorExpected {
require.NotNil(t, err)
return
}
require.NoError(t, err)
})
}
}
func TestReadChunkedDataColumnSidecar(t *testing.T) {
t.Run("non nil status code", func(t *testing.T) {
const reason = "a dummy reason"

View File

@@ -3,8 +3,6 @@ package sync
import (
"context"
"fmt"
"sync"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v6/config/features"
@@ -14,35 +12,15 @@ import (
eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
var (
attestationTracker = make(map[primitives.Slot]*slotAttestationTracker)
attestationTrackerMu sync.RWMutex
)
type slotAttestationTracker struct {
count uint64
startTime time.Time
loggedThresholds map[int]bool
mu sync.RWMutex
}
func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, msg proto.Message) error {
a, ok := msg.(eth.Att)
if !ok {
return fmt.Errorf("message was not type eth.Att, type=%T", msg)
}
currentSlot := s.cfg.clock.CurrentSlot()
attSlot := a.GetData().GetSlot()
if attSlot == currentSlot {
s.trackAttestationArrival(attSlot)
}
if features.Get().EnableExperimentalAttestationPool {
return s.cfg.attestationCache.Add(a)
} else {
@@ -57,65 +35,6 @@ func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, m
}
}
func (s *Service) trackAttestationArrival(slot primitives.Slot) {
attestationTrackerMu.Lock()
tracker, exists := attestationTracker[slot]
if !exists {
slotStartTime, err := slots.ToTime(uint64(s.cfg.clock.GenesisTime().Unix()), slot)
if err != nil {
attestationTrackerMu.Unlock()
return
}
tracker = &slotAttestationTracker{
startTime: slotStartTime,
loggedThresholds: make(map[int]bool),
}
attestationTracker[slot] = tracker
}
attestationTrackerMu.Unlock()
tracker.mu.Lock()
tracker.count++
currentCount := tracker.count
sinceStart := time.Since(tracker.startTime)
tracker.mu.Unlock()
expectedAttestations := 1083289 / 32
percentage := float64(currentCount) / float64(expectedAttestations) * 100
thresholds := []int{40, 50, 66, 80, 90, 98}
tracker.mu.Lock()
defer tracker.mu.Unlock()
for _, threshold := range thresholds {
if percentage >= float64(threshold) && !tracker.loggedThresholds[threshold] {
log.WithFields(logrus.Fields{
"slotStartTime": tracker.startTime.Unix(),
"slot": slot,
"count": currentCount,
"expected": expectedAttestations,
"percentage": fmt.Sprintf("%.1f%%", percentage),
"sinceSlotStartTime": sinceStart,
"sinceAttCutoffTime": sinceStart - 4*time.Second,
}).Info("Attestation arrival threshold reached")
tracker.loggedThresholds[threshold] = true
}
}
s.cleanupOldTrackers(slot)
}
func (s *Service) cleanupOldTrackers(currentSlot primitives.Slot) {
attestationTrackerMu.Lock()
defer attestationTrackerMu.Unlock()
for slot := range attestationTracker {
if slot < currentSlot-2 {
delete(attestationTracker, slot)
}
}
}
func (*Service) persistentSubnetIndices() []uint64 {
return cache.SubnetIDs.GetAllSubnets()
}

View File

@@ -0,0 +1,3 @@
### Added
- Added `json-only` debugging feature flag on validator client that forces use of json requests on api endpoints that support both json and ssz.

View File

@@ -1,2 +0,0 @@
### Added
- Implement beacon API blob sidecar enpoint for Fulu.

View File

@@ -1,2 +0,0 @@
### Fixed
- Non deterministic output order of `dataColumnSidecarByRootRPCHandler`.

View File

@@ -1,3 +0,0 @@
### Added
- Implement `SendDataColumnSidecarsByRangeRequest`.
- Implement `SendDataColumnSidecarsByRootRequest`.

View File

@@ -53,6 +53,7 @@ type Flags struct {
EnableDutiesV2 bool // EnableDutiesV2 sets validator client to use the get Duties V2 endpoint
EnableWeb bool // EnableWeb enables the webui on the validator client
SSZOnly bool // SSZOnly forces the validator client to use SSZ for communication with the beacon node when REST mode is enabled (useful for debugging)
JSONOnly bool // JSONOnly forces the validator client to use JSON for communication with the beacon node when REST mode is enabled (useful for debugging)
// Logging related toggles.
DisableGRPCConnectionLogs bool // Disables logging when a new grpc client has connected.
EnableFullSSZDataLogging bool // Enables logging for full ssz data on rejected gossip messages
@@ -349,6 +350,10 @@ func ConfigureValidator(ctx *cli.Context) error {
logEnabled(SSZOnly)
cfg.SSZOnly = true
}
if ctx.Bool(JSONOnly.Name) {
logEnabled(JSONOnly)
cfg.JSONOnly = true
}
cfg.KeystoreImportDebounceInterval = ctx.Duration(dynamicKeyReloadDebounceInterval.Name)
Init(cfg)

View File

@@ -207,6 +207,12 @@ var (
Name: "ssz-only",
Usage: "(debug): Forces the validator client to use SSZ for communication with the beacon node when REST mode is enabled",
}
// JSONOnly forces the validator client to use JSON for communication with the beacon node when REST mode is enabled
JSONOnly = &cli.BoolFlag{
Name: "json-only",
Usage: "(debug): Forces the validator client to use JSON for communication with the beacon node when REST mode is enabled",
}
)
// devModeFlags holds list of flags that are set when development mode is on.

View File

@@ -1,6 +1,7 @@
package util
import (
"encoding/binary"
"math/big"
"testing"
@@ -109,21 +110,14 @@ func GenerateTestFuluBlockWithSidecars(t *testing.T, blobCount int, options ...F
block.Block.ParentRoot = generator.parent[:]
block.Block.ProposerIndex = generator.proposer
blobs := make([]kzg.Blob, 0, generator.blobCount)
commitments := make([][]byte, 0, generator.blobCount)
for i := range generator.blobCount {
blob := kzg.Blob{uint8(i)}
commitment, err := kzg.BlobToKZGCommitment(&blob)
require.NoError(t, err)
blobs = append(blobs, blob)
commitments = append(commitments, commitment[:])
block.Block.Body.BlobKzgCommitments = make([][]byte, blobCount)
for i := range blobCount {
var commitment [fieldparams.KzgCommitmentSize]byte
binary.LittleEndian.PutUint16(commitment[:16], uint16(i))
binary.LittleEndian.PutUint16(commitment[16:32], uint16(generator.slot))
block.Block.Body.BlobKzgCommitments[i] = commitment[:]
}
block.Block.Body.BlobKzgCommitments = commitments
body, err := blocks.NewBeaconBlockBody(block.Block.Body)
require.NoError(t, err)
@@ -155,30 +149,39 @@ func GenerateTestFuluBlockWithSidecars(t *testing.T, blobCount int, options ...F
root, err := block.Block.HashTreeRoot()
require.NoError(t, err)
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(block)
sbb, err := blocks.NewSignedBeaconBlock(block)
require.NoError(t, err)
sh, err := sbb.Header()
require.NoError(t, err)
blobs := make([]kzg.Blob, blobCount)
for i, commitment := range block.Block.Body.BlobKzgCommitments {
roSidecars := GenerateTestDenebBlobSidecar(t, root, sh, i, commitment, inclusion[i])
blobs[i] = kzg.Blob(roSidecars.Blob)
}
cellsAndProofs := GenerateCellsAndProofs(t, blobs)
sidecars, err := peerdas.DataColumnSidecars(signedBeaconBlock, cellsAndProofs)
dataColumns, err := peerdas.DataColumnSidecars(sbb, cellsAndProofs)
require.NoError(t, err)
roSidecars := make([]blocks.RODataColumn, 0, len(sidecars))
verifiedRoSidecars := make([]blocks.VerifiedRODataColumn, 0, len(sidecars))
for _, sidecar := range sidecars {
roSidecar, err := blocks.NewRODataColumnWithRoot(sidecar, root)
roSidecars := make([]blocks.RODataColumn, 0, len(dataColumns))
roVerifiedSidecars := make([]blocks.VerifiedRODataColumn, 0, len(dataColumns))
for _, dataColumn := range dataColumns {
roSidecar, err := blocks.NewRODataColumnWithRoot(dataColumn, root)
require.NoError(t, err)
roVerifiedSidecar := blocks.NewVerifiedRODataColumn(roSidecar)
roSidecars = append(roSidecars, roSidecar)
verifiedRoSidecars = append(verifiedRoSidecars, roVerifiedSidecar)
roVerifiedSidecars = append(roVerifiedSidecars, roVerifiedSidecar)
}
roBlock, err := blocks.NewROBlockWithRoot(signedBeaconBlock, root)
rob, err := blocks.NewROBlockWithRoot(sbb, root)
require.NoError(t, err)
return roBlock, roSidecars, verifiedRoSidecars
return rob, roSidecars, roVerifiedSidecars
}
func GenerateCellsAndProofs(t testing.TB, blobs []kzg.Blob) []kzg.CellsAndProofs {

View File

@@ -82,6 +82,9 @@ func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]b
if features.Get().SSZOnly {
acceptHeaderString = api.OctetStreamMediaType
}
if features.Get().JSONOnly {
acceptHeaderString = api.JsonMediaType
}
req.Header.Set("Accept", acceptHeaderString)
httpResp, err := c.client.Do(req)
if err != nil {
@@ -115,7 +118,7 @@ func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]b
return nil, nil, errorJson
}
if features.Get().SSZOnly && contentType != api.OctetStreamMediaType {
if (features.Get().SSZOnly && contentType != api.OctetStreamMediaType) || (features.Get().JSONOnly && contentType != api.JsonMediaType) {
return nil, nil, errors.Errorf("server responded with non primary accept type %s", contentType)
}