Compare commits

...

63 Commits

Author SHA1 Message Date
Kasey Kirkham
bf0ccf05b2 remove duplicate HTR in call path 2025-09-14 17:13:59 -05:00
Kasey Kirkham
9aeb89cedd refactor to deduplicate sidecar construction code 2025-09-12 23:14:27 -05:00
Manu NALEPA
81d413ca73 Merge branch 'develop' into peerDAS-getBlobsV2 2025-09-11 16:04:34 +02:00
satushh
a55d61de9c remove redundant flag option 2025-08-29 19:02:51 +01:00
satushh
4faeac57e1 fix bad merge 2025-08-29 18:58:00 +01:00
satushh
16bed0a93f correct bad merge 2025-08-29 18:45:27 +01:00
satushh
0e9d5f4513 tidy up IsDataAvailable 2025-08-29 18:30:15 +01:00
satushh
65d6be0406 put samplesPerSlot at appropriate place 2025-08-29 18:23:34 +01:00
satushh
115e20de62 fix more tests 2025-08-29 18:23:34 +01:00
satushh
fb4028f736 fix test 2025-08-29 18:23:34 +01:00
satushh
af7470e835 remove redundant function and fix name 2025-08-29 18:23:34 +01:00
satushh
618fe1d1b2 blockchain: get variable samplesPerSlot only when required 2025-08-29 18:23:34 +01:00
satushh
71fe640eef execution: use service context instead of function's for retry 2025-08-29 18:23:34 +01:00
satushh
350fead19b execution: edge case - delete activeRetries on success 2025-08-29 18:23:34 +01:00
satushh
23645e549a sync: new appropriate mock service 2025-08-29 18:23:34 +01:00
satushh
2abe1adb29 sync: fix lint, test and add extra test for when data is actually not available 2025-08-29 18:23:34 +01:00
satushh
c9f3e111e6 lint: formatting and remove confusing comment 2025-08-29 18:23:34 +01:00
satushh
f8410908e9 blockchain: cleaner DA check 2025-08-29 18:23:34 +01:00
satushh
4c328977d9 execution: make reconstructSingleflight part of the service struct 2025-08-29 18:23:34 +01:00
satushh
b98a875194 blockchain: move IsDataAvailable interface to blockchain package 2025-08-29 18:23:34 +01:00
satushh
ee3a7b980a sync: don't call ReconstructDataColumnSidecars if not required 2025-08-29 18:23:34 +01:00
satushh
62dfe6bde5 execution: ensure single responsibility, execution should not do DA check 2025-08-29 18:23:34 +01:00
satushh
3a671abaa0 execution: ensure the retry actually happens when it needs to 2025-08-29 18:23:34 +01:00
satushh
ae6ca74c24 lint: format 2025-08-29 18:23:34 +01:00
satushh
bdd423e100 execution: retry logic inside ReconstructDataColumnSidecars itself 2025-08-29 18:23:34 +01:00
satushh
b9e31f258b lint: lint and use unused metrics 2025-08-29 18:23:34 +01:00
satushh
7c1eef5780 lint: formatting 2025-08-29 18:23:34 +01:00
satushh
88620a1a63 blockchain: fix CustodyGroupCount return 2025-08-29 18:23:34 +01:00
satushh
3c9a0b9511 bazel: bazel run //:gazelle -- fix 2025-08-29 18:23:34 +01:00
satushh
a8c5cfd871 sync: remove unwanted tests 2025-08-29 18:23:34 +01:00
satushh
d3b2122542 da: updated IsDataAvailable 2025-08-29 18:23:34 +01:00
satushh
a12b00c71c execution: retry atomicity test 2025-08-29 18:23:34 +01:00
satushh
358b9acc53 execution: fix test 2025-08-29 18:23:34 +01:00
satushh
aa31a16a9c sync: remove unwanted checks 2025-08-29 18:23:34 +01:00
satushh
86bbb6ee3b da: non blocking checks 2025-08-29 18:23:34 +01:00
satushh
139c2b887c exec: hardcode retry interval 2025-08-29 18:23:34 +01:00
satushh
88b03089d2 sync: no goroutine, getblobsv2 in absence of block as well, wrap error 2025-08-29 18:23:34 +01:00
satushh
bce0960422 engine: remove isDataAlreadyAvailable function 2025-08-29 18:23:34 +01:00
satushh
58f196c38c reconstruct: simplify multi goroutine case and avoid race condition 2025-08-29 18:23:34 +01:00
satushh
de63e24815 reconstruct: load once, correctly deliver the result to all waiting goroutines 2025-08-29 18:23:34 +01:00
satushh
c851987e49 beacon: default retry interval 2025-08-29 18:23:34 +01:00
satushh
141f9adff3 lint: remove unused field 2025-08-29 18:23:34 +01:00
satushh
47bf2bd985 sidecar: recover function and different context for retrying 2025-08-29 18:23:34 +01:00
satushh
fb28a17fd5 config: make retry interval configurable 2025-08-29 18:23:34 +01:00
satushh
338d4c29d2 lint: return error when it is not nil 2025-08-29 18:23:34 +01:00
satushh
35f6d1277d lint: fmt and log capitalisation 2025-08-29 18:23:34 +01:00
satushh
77b3d23c86 test: engine client and sync package, metrics 2025-08-29 18:23:34 +01:00
satushh
32e81a33dd getBlobsV2: retry if reconstruction isnt successful 2025-08-29 18:23:34 +01:00
Manu NALEPA
54ce47a839 Add DataColumnStorage and SubscribeAllDataSubnets flag. 2025-08-29 18:23:34 +01:00
Manu NALEPA
208febf884 Fix Potuz's comment. 2025-08-29 18:23:34 +01:00
Manu NALEPA
b3c9cc2459 selectPeers: Avoid map with key but empty value. 2025-08-29 18:23:34 +01:00
Manu NALEPA
d8895ec49c Revert "Fix James' comment."
This reverts commit a3f919205a.
2025-08-29 18:23:34 +01:00
Manu NALEPA
5e593d50eb Revert "Fix Potuz's comment."
This reverts commit c45230b455.
2025-08-29 18:23:34 +01:00
Manu NALEPA
4878468047 Fix flakiness in TestSelectPeers. 2025-08-29 18:23:34 +01:00
Manu NALEPA
e10045cf8c Fix James' comment. 2025-08-29 18:23:34 +01:00
Manu NALEPA
b18c4b99c7 Implement TestSelectPeers. 2025-08-29 18:23:34 +01:00
Manu NALEPA
272a95934f Implement TestFetchDataColumnSidecarsFromPeers. 2025-08-29 18:23:34 +01:00
Manu NALEPA
e4582ed1e2 Fix Potuz's comment. 2025-08-29 18:23:34 +01:00
Manu NALEPA
f2cc2e7128 Fix Potuz's comment. 2025-08-29 18:23:34 +01:00
Manu NALEPA
84b1bc7635 Fix Potuz's comment. 2025-08-29 18:23:34 +01:00
Manu NALEPA
c51a8f5965 Fix Potuz's comment. 2025-08-29 18:23:34 +01:00
Manu NALEPA
fdb858473e Fix Potuz's comment. 2025-08-29 18:23:34 +01:00
Manu NALEPA
e2356bb7ad PeerDAS: Implement sync 2025-08-29 18:23:34 +01:00
41 changed files with 1233 additions and 642 deletions

View File

@@ -14,7 +14,10 @@ const BytesPerBlob = ckzg4844.BytesPerBlob
type Blob [BytesPerBlob]byte
// BytesPerCell is the number of bytes in a single cell.
const BytesPerCell = ckzg4844.BytesPerCell
const (
BytesPerCell = ckzg4844.BytesPerCell
BytesPerProof = ckzg4844.BytesPerProof
)
// Cell represents a chunk of an encoded Blob.
type Cell [BytesPerCell]byte
@@ -23,7 +26,7 @@ type Cell [BytesPerCell]byte
type Commitment [48]byte
// Proof represents a KZG proof that attests to the validity of a Blob or parts of it.
type Proof [48]byte
type Proof [BytesPerProof]byte
// Bytes48 is a 48-byte array.
type Bytes48 = ckzg4844.Bytes48

View File

@@ -664,14 +664,14 @@ func missingDataColumnIndices(store *filesystem.DataColumnStorage, root [fieldpa
// closed, the context hits cancellation/timeout, or notifications have been received for all the missing sidecars.
func (s *Service) isDataAvailable(
ctx context.Context,
root [fieldparams.RootLength]byte,
signedBlock interfaces.ReadOnlySignedBeaconBlock,
roBlock consensusblocks.ROBlock,
) error {
block := signedBlock.Block()
block := roBlock.Block()
if block == nil {
return errors.New("invalid nil beacon block")
}
root := roBlock.Root()
blockVersion := block.Version()
if blockVersion >= version.Fulu {
return s.areDataColumnsAvailable(ctx, root, block)
@@ -691,8 +691,6 @@ func (s *Service) areDataColumnsAvailable(
root [fieldparams.RootLength]byte,
block interfaces.ReadOnlyBeaconBlock,
) error {
samplesPerSlot := params.BeaconConfig().SamplesPerSlot
// We are only required to check within MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS
blockSlot, currentSlot := block.Slot(), s.CurrentSlot()
blockEpoch, currentEpoch := slots.ToEpoch(blockSlot), slots.ToEpoch(currentSlot)
@@ -726,6 +724,7 @@ func (s *Service) areDataColumnsAvailable(
// Compute the sampling size.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#custody-sampling
samplesPerSlot := params.BeaconConfig().SamplesPerSlot
samplingSize := max(samplesPerSlot, custodyGroupCount)
// Get the peer info for the node.

View File

@@ -2952,14 +2952,18 @@ func TestIsDataAvailable(t *testing.T) {
params := testIsAvailableParams{options: []Option{WithGenesisTime(time.Unix(0, 0))}}
ctx, _, service, root, signed := testIsAvailableSetup(t, params)
err := service.isDataAvailable(ctx, root, signed)
roBlock, err := consensusblocks.NewROBlockWithRoot(signed, root)
require.NoError(t, err)
err = service.isDataAvailable(ctx, roBlock)
require.NoError(t, err)
})
t.Run("Fulu - no commitment in blocks", func(t *testing.T) {
ctx, _, service, root, signed := testIsAvailableSetup(t, testIsAvailableParams{})
err := service.isDataAvailable(ctx, root, signed)
roBlock, err := consensusblocks.NewROBlockWithRoot(signed, root)
require.NoError(t, err)
err = service.isDataAvailable(ctx, roBlock)
require.NoError(t, err)
})
@@ -2977,7 +2981,9 @@ func TestIsDataAvailable(t *testing.T) {
ctx, _, service, root, signed := testIsAvailableSetup(t, params)
err := service.isDataAvailable(ctx, root, signed)
roBlock, err := consensusblocks.NewROBlockWithRoot(signed, root)
require.NoError(t, err)
err = service.isDataAvailable(ctx, roBlock)
require.NoError(t, err)
})
@@ -2989,7 +2995,9 @@ func TestIsDataAvailable(t *testing.T) {
ctx, _, service, root, signed := testIsAvailableSetup(t, params)
err := service.isDataAvailable(ctx, root, signed)
roBlock, err := consensusblocks.NewROBlockWithRoot(signed, root)
require.NoError(t, err)
err = service.isDataAvailable(ctx, roBlock)
require.NoError(t, err)
})
@@ -3037,7 +3045,9 @@ func TestIsDataAvailable(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second*2)
defer cancel()
err = service.isDataAvailable(ctx, root, signed)
roBlock, err := consensusblocks.NewROBlockWithRoot(signed, root)
require.NoError(t, err)
err = service.isDataAvailable(ctx, roBlock)
require.NoError(t, err)
})
@@ -3099,7 +3109,9 @@ func TestIsDataAvailable(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second*2)
defer cancel()
err = service.isDataAvailable(ctx, root, signed)
roBlock, err := consensusblocks.NewROBlockWithRoot(signed, root)
require.NoError(t, err)
err = service.isDataAvailable(ctx, roBlock)
require.NoError(t, err)
})
@@ -3118,7 +3130,9 @@ func TestIsDataAvailable(t *testing.T) {
cancel()
}()
err := service.isDataAvailable(ctx, root, signed)
roBlock, err := consensusblocks.NewROBlockWithRoot(signed, root)
require.NoError(t, err)
err = service.isDataAvailable(ctx, roBlock)
require.NotNil(t, err)
})
}

View File

@@ -16,7 +16,6 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/slasher/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/state"
"github.com/OffchainLabs/prysm/v6/config/features"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
@@ -112,7 +111,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
return err
}
daWaitedTime, err := s.handleDA(ctx, blockCopy, blockRoot, avs)
daWaitedTime, err := s.handleDA(ctx, avs, roblock)
if err != nil {
return err
}
@@ -240,37 +239,19 @@ func (s *Service) validateExecutionAndConsensus(
return postState, isValidPayload, nil
}
func (s *Service) handleDA(
ctx context.Context,
block interfaces.SignedBeaconBlock,
blockRoot [fieldparams.RootLength]byte,
avs das.AvailabilityStore,
) (elapsed time.Duration, err error) {
defer func(start time.Time) {
elapsed = time.Since(start)
if err == nil {
dataAvailWaitedTime.Observe(float64(elapsed.Milliseconds()))
}
}(time.Now())
if avs == nil {
if err = s.isDataAvailable(ctx, blockRoot, block); err != nil {
return
}
return
func (s *Service) handleDA(ctx context.Context, avs das.AvailabilityStore, block blocks.ROBlock) (time.Duration, error) {
var err error
start := time.Now()
if avs != nil {
err = avs.IsDataAvailable(ctx, s.CurrentSlot(), block)
} else {
err = s.isDataAvailable(ctx, block)
}
var rob blocks.ROBlock
rob, err = blocks.NewROBlockWithRoot(block, blockRoot)
elapsed := time.Since(start)
if err != nil {
return
dataAvailWaitedTime.Observe(float64(elapsed.Milliseconds()))
}
err = avs.IsDataAvailable(ctx, s.CurrentSlot(), rob)
return
return elapsed, err
}
func (s *Service) reportPostBlockProcessing(

View File

@@ -192,7 +192,9 @@ func TestHandleDA(t *testing.T) {
require.NoError(t, err)
s, _ := minimalTestService(t)
elapsed, err := s.handleDA(t.Context(), signedBeaconBlock, [fieldparams.RootLength]byte{}, nil)
block, err := blocks.NewROBlockWithRoot(signedBeaconBlock, [32]byte{})
require.NoError(t, err)
elapsed, err := s.handleDA(t.Context(), nil, block)
require.NoError(t, err)
require.Equal(t, true, elapsed > 0, "Elapsed time should be greater than 0")
}

View File

@@ -24,8 +24,8 @@ import (
p2pTesting "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v6/beacon-chain/state/stategen"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
@@ -89,7 +89,7 @@ func (mb *mockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
return nil
}
func (mb *mockBroadcaster) BroadcastDataColumnSidecar(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar) error {
func (mb *mockBroadcaster) BroadcastDataColumnSidecar(_ uint64, _ blocks.VerifiedRODataColumn) error {
mb.broadcastCalled = true
return nil
}

View File

@@ -5,6 +5,7 @@ go_library(
srcs = [
"das_core.go",
"info.go",
"log.go",
"metrics.go",
"p2p_interface.go",
"reconstruction.go",
@@ -19,7 +20,6 @@ go_library(
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//container/trie:go_default_library",
"//crypto/hash:go_default_library",
@@ -33,6 +33,7 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

View File

@@ -4,15 +4,10 @@ import (
"encoding/binary"
"math"
"slices"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/crypto/hash"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/holiman/uint256"
"github.com/pkg/errors"
@@ -20,12 +15,9 @@ import (
var (
// Custom errors
ErrCustodyGroupTooLarge = errors.New("custody group too large")
ErrCustodyGroupCountTooLarge = errors.New("custody group count too large")
ErrSizeMismatch = errors.New("mismatch in the number of blob KZG commitments and cellsAndProofs")
ErrNotEnoughDataColumnSidecars = errors.New("not enough columns")
ErrDataColumnSidecarsNotSortedByIndex = errors.New("data column sidecars are not sorted by index")
errWrongComputedCustodyGroupCount = errors.New("wrong computed custody group count, should never happen")
ErrCustodyGroupTooLarge = errors.New("custody group too large")
ErrCustodyGroupCountTooLarge = errors.New("custody group count too large")
errWrongComputedCustodyGroupCount = errors.New("wrong computed custody group count, should never happen")
// maxUint256 is the maximum value of an uint256.
maxUint256 = &uint256.Int{math.MaxUint64, math.MaxUint64, math.MaxUint64, math.MaxUint64}
@@ -117,44 +109,6 @@ func ComputeColumnsForCustodyGroup(custodyGroup uint64) ([]uint64, error) {
return columns, nil
}
// DataColumnSidecars computes the data column sidecars from the signed block, cells and cell proofs.
// The returned value contains pointers to function parameters.
// (If the caller alterates `cellsAndProofs` afterwards, the returned value will be modified as well.)
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#get_data_column_sidecars_from_block
func DataColumnSidecars(signedBlock interfaces.ReadOnlySignedBeaconBlock, cellsAndProofs []kzg.CellsAndProofs) ([]*ethpb.DataColumnSidecar, error) {
if signedBlock == nil || signedBlock.IsNil() || len(cellsAndProofs) == 0 {
return nil, nil
}
block := signedBlock.Block()
blockBody := block.Body()
blobKzgCommitments, err := blockBody.BlobKzgCommitments()
if err != nil {
return nil, errors.Wrap(err, "blob KZG commitments")
}
if len(blobKzgCommitments) != len(cellsAndProofs) {
return nil, ErrSizeMismatch
}
signedBlockHeader, err := signedBlock.Header()
if err != nil {
return nil, errors.Wrap(err, "signed block header")
}
kzgCommitmentsInclusionProof, err := blocks.MerkleProofKZGCommitments(blockBody)
if err != nil {
return nil, errors.Wrap(err, "merkle proof KZG commitments")
}
dataColumnSidecars, err := dataColumnsSidecars(signedBlockHeader, blobKzgCommitments, kzgCommitmentsInclusionProof, cellsAndProofs)
if err != nil {
return nil, errors.Wrap(err, "data column sidecars")
}
return dataColumnSidecars, nil
}
// ComputeCustodyGroupForColumn computes the custody group for a given column.
// It is the reciprocal function of ComputeColumnsForCustodyGroup.
func ComputeCustodyGroupForColumn(columnIndex uint64) (uint64, error) {
@@ -194,72 +148,3 @@ func CustodyColumns(custodyGroups []uint64) (map[uint64]bool, error) {
return columns, nil
}
// dataColumnsSidecars computes the data column sidecars from the signed block header, the blob KZG commiments,
// the KZG commitment includion proofs and cells and cell proofs.
// The returned value contains pointers to function parameters.
// (If the caller alterates input parameters afterwards, the returned value will be modified as well.)
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#get_data_column_sidecars
func dataColumnsSidecars(
signedBlockHeader *ethpb.SignedBeaconBlockHeader,
blobKzgCommitments [][]byte,
kzgCommitmentsInclusionProof [][]byte,
cellsAndProofs []kzg.CellsAndProofs,
) ([]*ethpb.DataColumnSidecar, error) {
start := time.Now()
if len(blobKzgCommitments) != len(cellsAndProofs) {
return nil, ErrSizeMismatch
}
numberOfColumns := params.BeaconConfig().NumberOfColumns
blobsCount := len(cellsAndProofs)
sidecars := make([]*ethpb.DataColumnSidecar, 0, numberOfColumns)
for columnIndex := range numberOfColumns {
column := make([]kzg.Cell, 0, blobsCount)
kzgProofOfColumn := make([]kzg.Proof, 0, blobsCount)
for rowIndex := range blobsCount {
cellsForRow := cellsAndProofs[rowIndex].Cells
proofsForRow := cellsAndProofs[rowIndex].Proofs
// Validate that we have enough cells and proofs for this column index
if columnIndex >= uint64(len(cellsForRow)) {
return nil, errors.Errorf("column index %d exceeds cells length %d for blob %d", columnIndex, len(cellsForRow), rowIndex)
}
if columnIndex >= uint64(len(proofsForRow)) {
return nil, errors.Errorf("column index %d exceeds proofs length %d for blob %d", columnIndex, len(proofsForRow), rowIndex)
}
cell := cellsForRow[columnIndex]
column = append(column, cell)
kzgProof := proofsForRow[columnIndex]
kzgProofOfColumn = append(kzgProofOfColumn, kzgProof)
}
columnBytes := make([][]byte, 0, blobsCount)
for i := range column {
columnBytes = append(columnBytes, column[i][:])
}
kzgProofOfColumnBytes := make([][]byte, 0, blobsCount)
for _, kzgProof := range kzgProofOfColumn {
kzgProofOfColumnBytes = append(kzgProofOfColumnBytes, kzgProof[:])
}
sidecar := &ethpb.DataColumnSidecar{
Index: columnIndex,
Column: columnBytes,
KzgCommitments: blobKzgCommitments,
KzgProofs: kzgProofOfColumnBytes,
SignedBlockHeader: signedBlockHeader,
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
}
sidecars = append(sidecars, sidecar)
}
dataColumnComputationTime.Observe(float64(time.Since(start).Milliseconds()))
return sidecars, nil
}

View File

@@ -3,13 +3,9 @@ package peerdas_test
import (
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/ethereum/go-ethereum/p2p/enode"
)
@@ -31,93 +27,6 @@ func TestComputeColumnsForCustodyGroup(t *testing.T) {
require.ErrorIs(t, err, peerdas.ErrCustodyGroupTooLarge)
}
func TestDataColumnSidecars(t *testing.T) {
t.Run("nil signed block", func(t *testing.T) {
var expected []*ethpb.DataColumnSidecar = nil
actual, err := peerdas.DataColumnSidecars(nil, []kzg.CellsAndProofs{})
require.NoError(t, err)
require.DeepSSZEqual(t, expected, actual)
})
t.Run("empty cells and proofs", func(t *testing.T) {
// Create a protobuf signed beacon block.
signedBeaconBlockPb := util.NewBeaconBlockDeneb()
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
actual, err := peerdas.DataColumnSidecars(signedBeaconBlock, []kzg.CellsAndProofs{})
require.NoError(t, err)
require.IsNil(t, actual)
})
t.Run("sizes mismatch", func(t *testing.T) {
// Create a protobuf signed beacon block.
signedBeaconBlockPb := util.NewBeaconBlockDeneb()
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
// Create cells and proofs.
cellsAndProofs := make([]kzg.CellsAndProofs, 1)
_, err = peerdas.DataColumnSidecars(signedBeaconBlock, cellsAndProofs)
require.ErrorIs(t, err, peerdas.ErrSizeMismatch)
})
t.Run("cells array too short for column index", func(t *testing.T) {
// Create a Fulu block with a blob commitment.
signedBeaconBlockPb := util.NewBeaconBlockFulu()
signedBeaconBlockPb.Block.Body.BlobKzgCommitments = [][]byte{make([]byte, 48)}
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
// Create cells and proofs with insufficient cells for the number of columns.
// This simulates a scenario where cellsAndProofs has fewer cells than expected columns.
cellsAndProofs := []kzg.CellsAndProofs{
{
Cells: make([]kzg.Cell, 10), // Only 10 cells
Proofs: make([]kzg.Proof, 10), // Only 10 proofs
},
}
// This should fail because the function will try to access columns up to NumberOfColumns
// but we only have 10 cells/proofs.
_, err = peerdas.DataColumnSidecars(signedBeaconBlock, cellsAndProofs)
require.ErrorContains(t, "column index", err)
require.ErrorContains(t, "exceeds cells length", err)
})
t.Run("proofs array too short for column index", func(t *testing.T) {
// Create a Fulu block with a blob commitment.
signedBeaconBlockPb := util.NewBeaconBlockFulu()
signedBeaconBlockPb.Block.Body.BlobKzgCommitments = [][]byte{make([]byte, 48)}
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
// Create cells and proofs with sufficient cells but insufficient proofs.
numberOfColumns := params.BeaconConfig().NumberOfColumns
cellsAndProofs := []kzg.CellsAndProofs{
{
Cells: make([]kzg.Cell, numberOfColumns),
Proofs: make([]kzg.Proof, 5), // Only 5 proofs, less than columns
},
}
// This should fail when trying to access proof beyond index 4.
_, err = peerdas.DataColumnSidecars(signedBeaconBlock, cellsAndProofs)
require.ErrorContains(t, "column index", err)
require.ErrorContains(t, "exceeds proofs length", err)
})
}
func TestComputeCustodyGroupForColumn(t *testing.T) {
params.SetupTestConfigCleanup(t)
config := params.BeaconConfig()

View File

@@ -63,17 +63,14 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
t.Run("invalid proof", func(t *testing.T) {
sidecars := generateRandomSidecars(t, seed, blobCount)
sidecars[0].Column[0][0]++ // It is OK to overflow
roDataColumnSidecars := generateRODataColumnSidecars(t, sidecars)
err := peerdas.VerifyDataColumnsSidecarKZGProofs(roDataColumnSidecars)
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
require.ErrorIs(t, err, peerdas.ErrInvalidKZGProof)
})
t.Run("nominal", func(t *testing.T) {
sidecars := generateRandomSidecars(t, seed, blobCount)
roDataColumnSidecars := generateRODataColumnSidecars(t, sidecars)
err := peerdas.VerifyDataColumnsSidecarKZGProofs(roDataColumnSidecars)
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
require.NoError(t, err)
})
}
@@ -256,9 +253,8 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_SameCommitments_NoBatch(b *testin
for i := range int64(b.N) {
// Generate new random sidecars to ensure the KZG backend does not cache anything.
sidecars := generateRandomSidecars(b, i, blobCount)
roDataColumnSidecars := generateRODataColumnSidecars(b, sidecars)
for _, sidecar := range roDataColumnSidecars {
for _, sidecar := range sidecars {
sidecars := []blocks.RODataColumn{sidecar}
b.StartTimer()
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
@@ -282,7 +278,7 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_DiffCommitments_Batch(b *testing.
b.ResetTimer()
for j := range int64(b.N) {
allSidecars := make([]*ethpb.DataColumnSidecar, 0, numberOfColumns)
allSidecars := make([]blocks.RODataColumn, 0, numberOfColumns)
for k := int64(0); k < numberOfColumns; k += columnsCount {
// Use different seeds to generate different blobs/commitments
seed := int64(b.N*i) + numberOfColumns*j + blobCount*k
@@ -292,10 +288,8 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_DiffCommitments_Batch(b *testing.
allSidecars = append(allSidecars, sidecars[k:k+columnsCount]...)
}
roDataColumnSidecars := generateRODataColumnSidecars(b, allSidecars)
b.StartTimer()
err := peerdas.VerifyDataColumnsSidecarKZGProofs(roDataColumnSidecars)
err := peerdas.VerifyDataColumnsSidecarKZGProofs(allSidecars)
b.StopTimer()
require.NoError(b, err)
}
@@ -323,8 +317,7 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_DiffCommitments_Batch4(b *testing
for j := range int64(batchCount) {
// Use different seeds to generate different blobs/commitments
sidecars := generateRandomSidecars(b, int64(batchCount)*i+j*blobCount, blobCount)
roDataColumnSidecars := generateRODataColumnSidecars(b, sidecars[:columnsCount])
allSidecars = append(allSidecars, roDataColumnSidecars)
allSidecars = append(allSidecars, sidecars)
}
for _, sidecars := range allSidecars {
@@ -358,7 +351,7 @@ func createTestSidecar(t *testing.T, index uint64, column, kzgCommitments, kzgPr
return roSidecar
}
func generateRandomSidecars(t testing.TB, seed, blobCount int64) []*ethpb.DataColumnSidecar {
func generateRandomSidecars(t testing.TB, seed, blobCount int64) []blocks.RODataColumn {
dbBlock := util.NewBeaconBlockDeneb()
commitments := make([][]byte, 0, blobCount)
@@ -379,20 +372,10 @@ func generateRandomSidecars(t testing.TB, seed, blobCount int64) []*ethpb.DataCo
require.NoError(t, err)
cellsAndProofs := util.GenerateCellsAndProofs(t, blobs)
sidecars, err := peerdas.DataColumnSidecars(sBlock, cellsAndProofs)
rob, err := blocks.NewROBlock(sBlock)
require.NoError(t, err)
sidecars, err := peerdas.ConstructDataColumnSidecar(cellsAndProofs, peerdas.PopulateFromBlock(rob))
require.NoError(t, err)
return sidecars
}
func generateRODataColumnSidecars(t testing.TB, sidecars []*ethpb.DataColumnSidecar) []blocks.RODataColumn {
roDataColumnSidecars := make([]blocks.RODataColumn, 0, len(sidecars))
for _, sidecar := range sidecars {
roCol, err := blocks.NewRODataColumn(sidecar)
require.NoError(t, err)
roDataColumnSidecars = append(roDataColumnSidecars, roCol)
}
return roDataColumnSidecars
}

View File

@@ -5,7 +5,6 @@ import (
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
@@ -62,12 +61,6 @@ func ReconstructDataColumnSidecars(inVerifiedRoSidecars []blocks.VerifiedRODataC
return nil, ErrNotEnoughDataColumnSidecars
}
// Sidecars are verified and are committed to the same block.
// All signed block headers, KZG commitments, and inclusion proofs are the same.
signedBlockHeader := referenceSidecar.SignedBlockHeader
kzgCommitments := referenceSidecar.KzgCommitments
kzgCommitmentsInclusionProof := referenceSidecar.KzgCommitmentsInclusionProof
// Recover cells and compute proofs in parallel.
var wg errgroup.Group
cellsAndProofs := make([]kzg.CellsAndProofs, blobCount)
@@ -100,7 +93,7 @@ func ReconstructDataColumnSidecars(inVerifiedRoSidecars []blocks.VerifiedRODataC
return nil, errors.Wrap(err, "wait for RecoverCellsAndKZGProofs")
}
outSidecars, err := dataColumnsSidecars(signedBlockHeader, kzgCommitments, kzgCommitmentsInclusionProof, cellsAndProofs)
outSidecars, err := ConstructDataColumnSidecar(cellsAndProofs, PopulateFromSidecar(referenceSidecar.RODataColumn))
if err != nil {
return nil, errors.Wrap(err, "data column sidecars from items")
}
@@ -109,71 +102,13 @@ func ReconstructDataColumnSidecars(inVerifiedRoSidecars []blocks.VerifiedRODataC
// As a consequence, reconstructed sidecars are also verified.
outVerifiedRoSidecars := make([]blocks.VerifiedRODataColumn, 0, len(outSidecars))
for _, sidecar := range outSidecars {
roSidecar, err := blocks.NewRODataColumnWithRoot(sidecar, blockRoot)
if err != nil {
return nil, errors.Wrap(err, "new RO data column with root")
}
verifiedRoSidecar := blocks.NewVerifiedRODataColumn(roSidecar)
verifiedRoSidecar := blocks.NewVerifiedRODataColumn(sidecar)
outVerifiedRoSidecars = append(outVerifiedRoSidecars, verifiedRoSidecar)
}
return outVerifiedRoSidecars, nil
}
// ConstructDataColumnSidecars constructs data column sidecars from a block, (un-extended) blobs and
// cell proofs corresponding the extended blobs. The main purpose of this function is to
// construct data column sidecars from data obtained from the execution client via:
// - `engine_getBlobsV2` - https://github.com/ethereum/execution-apis/blob/main/src/engine/osaka.md#engine_getblobsv2, or
// - `engine_getPayloadV5` - https://github.com/ethereum/execution-apis/blob/main/src/engine/osaka.md#engine_getpayloadv5
// Note: In this function, to stick with the `BlobsBundleV2` format returned by the execution client in `engine_getPayloadV5`,
// cell proofs are "flattened".
func ConstructDataColumnSidecars(block interfaces.ReadOnlySignedBeaconBlock, blobs [][]byte, cellProofs [][]byte) ([]*ethpb.DataColumnSidecar, error) {
// Check if the cells count is equal to the cell proofs count.
numberOfColumns := params.BeaconConfig().NumberOfColumns
blobCount := uint64(len(blobs))
cellProofsCount := uint64(len(cellProofs))
cellsCount := blobCount * numberOfColumns
if cellsCount != cellProofsCount {
return nil, ErrBlobsCellsProofsMismatch
}
cellsAndProofs := make([]kzg.CellsAndProofs, 0, blobCount)
for i, blob := range blobs {
var kzgBlob kzg.Blob
if copy(kzgBlob[:], blob) != len(kzgBlob) {
return nil, errors.New("wrong blob size - should never happen")
}
// Compute the extended cells from the (non-extended) blob.
cells, err := kzg.ComputeCells(&kzgBlob)
if err != nil {
return nil, errors.Wrap(err, "compute cells")
}
var proofs []kzg.Proof
for idx := uint64(i) * numberOfColumns; idx < (uint64(i)+1)*numberOfColumns; idx++ {
var kzgProof kzg.Proof
if copy(kzgProof[:], cellProofs[idx]) != len(kzgProof) {
return nil, errors.New("wrong KZG proof size - should never happen")
}
proofs = append(proofs, kzgProof)
}
cellsProofs := kzg.CellsAndProofs{Cells: cells, Proofs: proofs}
cellsAndProofs = append(cellsAndProofs, cellsProofs)
}
dataColumnSidecars, err := DataColumnSidecars(block, cellsAndProofs)
if err != nil {
return nil, errors.Wrap(err, "data column sidcars")
}
return dataColumnSidecars, nil
}
// ReconstructBlobs constructs verified read only blobs sidecars from verified read only blob sidecars.
// The following constraints must be satisfied:
// - All `dataColumnSidecars` has to be committed to the same block, and
@@ -256,6 +191,47 @@ func ReconstructBlobs(block blocks.ROBlock, verifiedDataColumnSidecars []blocks.
return blobSidecars, nil
}
// ComputeCellsAndProofs computes the cells and proofs from blobs and cell proofs.
func ComputeCellsAndProofs(blobs [][]byte, cellProofs [][]byte) ([]kzg.CellsAndProofs, error) {
numberOfColumns := params.BeaconConfig().NumberOfColumns
blobCount := uint64(len(blobs))
cellProofsCount := uint64(len(cellProofs))
cellsCount := blobCount * numberOfColumns
if cellsCount != cellProofsCount {
return nil, ErrBlobsCellsProofsMismatch
}
cellsAndProofs := make([]kzg.CellsAndProofs, 0, blobCount)
for i, blob := range blobs {
var kzgBlob kzg.Blob
if copy(kzgBlob[:], blob) != len(kzgBlob) {
return nil, errors.New("wrong blob size - should never happen")
}
// Compute the extended cells from the (non-extended) blob.
cells, err := kzg.ComputeCells(&kzgBlob)
if err != nil {
return nil, errors.Wrap(err, "compute cells")
}
var proofs []kzg.Proof
for idx := uint64(i) * numberOfColumns; idx < (uint64(i)+1)*numberOfColumns; idx++ {
var kzgProof kzg.Proof
if copy(kzgProof[:], cellProofs[idx]) != len(kzgProof) {
return nil, errors.New("wrong KZG proof size - should never happen")
}
proofs = append(proofs, kzgProof)
}
cellsProofs := kzg.CellsAndProofs{Cells: cells, Proofs: proofs}
cellsAndProofs = append(cellsAndProofs, cellsProofs)
}
return cellsAndProofs, nil
}
// blobSidecarsFromDataColumnSidecars converts verified data column sidecars to verified blob sidecars.
func blobSidecarsFromDataColumnSidecars(roBlock blocks.ROBlock, dataColumnSidecars []blocks.VerifiedRODataColumn, indices []int) ([]*blocks.VerifiedROBlob, error) {
referenceSidecar := dataColumnSidecars[0]

View File

@@ -9,7 +9,6 @@ import (
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/pkg/errors"
@@ -124,50 +123,6 @@ func TestReconstructDataColumnSidecars(t *testing.T) {
})
}
func TestConstructDataColumnSidecars(t *testing.T) {
const (
blobCount = 3
cellsPerBlob = fieldparams.CellsPerBlob
)
numberOfColumns := params.BeaconConfig().NumberOfColumns
// Start the trusted setup.
err := kzg.Start()
require.NoError(t, err)
roBlock, _, baseVerifiedRoSidecars := util.GenerateTestFuluBlockWithSidecars(t, blobCount)
// Extract blobs and proofs from the sidecars.
blobs := make([][]byte, 0, blobCount)
cellProofs := make([][]byte, 0, cellsPerBlob)
for blobIndex := range blobCount {
blob := make([]byte, 0, cellsPerBlob)
for columnIndex := range cellsPerBlob {
cell := baseVerifiedRoSidecars[columnIndex].Column[blobIndex]
blob = append(blob, cell...)
}
blobs = append(blobs, blob)
for columnIndex := range numberOfColumns {
cellProof := baseVerifiedRoSidecars[columnIndex].KzgProofs[blobIndex]
cellProofs = append(cellProofs, cellProof)
}
}
actual, err := peerdas.ConstructDataColumnSidecars(roBlock, blobs, cellProofs)
require.NoError(t, err)
// Extract the base verified ro sidecars into sidecars.
expected := make([]*ethpb.DataColumnSidecar, 0, len(baseVerifiedRoSidecars))
for _, verifiedRoSidecar := range baseVerifiedRoSidecars {
expected = append(expected, verifiedRoSidecar.DataColumnSidecar)
}
require.DeepSSZEqual(t, expected, actual)
}
func TestReconstructBlobs(t *testing.T) {
// Start the trusted setup.
err := kzg.Start()
@@ -250,7 +205,7 @@ func TestReconstructBlobs(t *testing.T) {
// Compute cells and proofs from blob sidecars.
var wg errgroup.Group
blobs := make([][]byte, blobCount)
cellsAndProofs := make([]kzg.CellsAndProofs, blobCount)
inputCellsAndProofs := make([]kzg.CellsAndProofs, blobCount)
for i := range blobCount {
blob := roBlobSidecars[i].Blob
blobs[i] = blob
@@ -267,7 +222,7 @@ func TestReconstructBlobs(t *testing.T) {
// It is safe for multiple goroutines to concurrently write to the same slice,
// as long as they are writing to different indices, which is the case here.
cellsAndProofs[i] = cp
inputCellsAndProofs[i] = cp
return nil
})
@@ -278,25 +233,24 @@ func TestReconstructBlobs(t *testing.T) {
// Flatten proofs.
cellProofs := make([][]byte, 0, blobCount*numberOfColumns)
for _, cp := range cellsAndProofs {
for _, cp := range inputCellsAndProofs {
for _, proof := range cp.Proofs {
cellProofs = append(cellProofs, proof[:])
}
}
// Construct data column sidecars.
// It is OK to use the public function `ConstructDataColumnSidecars`, as long as
// `TestConstructDataColumnSidecars` tests pass.
dataColumnSidecars, err := peerdas.ConstructDataColumnSidecars(roBlock, blobs, cellProofs)
// Compute celles and proofs from the blobs and cell proofs.
cellsAndProofs, err := peerdas.ComputeCellsAndProofs(blobs, cellProofs)
require.NoError(t, err)
// Construct data column sidears from the signed block and cells and proofs.
roDataColumnSidecars, err := peerdas.ConstructDataColumnSidecar(cellsAndProofs, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
// Convert to verified data column sidecars.
verifiedRoSidecars := make([]blocks.VerifiedRODataColumn, 0, len(dataColumnSidecars))
for _, dataColumnSidecar := range dataColumnSidecars {
roSidecar, err := blocks.NewRODataColumn(dataColumnSidecar)
require.NoError(t, err)
verifiedRoSidecar := blocks.NewVerifiedRODataColumn(roSidecar)
verifiedRoSidecars := make([]blocks.VerifiedRODataColumn, 0, len(roDataColumnSidecars))
for _, roDataColumnSidecar := range roDataColumnSidecars {
verifiedRoSidecar := blocks.NewVerifiedRODataColumn(roDataColumnSidecar)
verifiedRoSidecars = append(verifiedRoSidecars, verifiedRoSidecar)
}
@@ -339,3 +293,86 @@ func TestReconstructBlobs(t *testing.T) {
})
}
func TestComputeCellsAndProofs(t *testing.T) {
// Start the trusted setup.
err := kzg.Start()
require.NoError(t, err)
t.Run("mismatched blob and proof counts", func(t *testing.T) {
numberOfColumns := params.BeaconConfig().NumberOfColumns
// Create one blob but proofs for two blobs
blobs := [][]byte{{}}
// Create proofs for 2 blobs worth of columns
cellProofs := make([][]byte, 2*numberOfColumns)
_, err := peerdas.ComputeCellsAndProofs(blobs, cellProofs)
require.ErrorIs(t, err, peerdas.ErrBlobsCellsProofsMismatch)
})
t.Run("nominal", func(t *testing.T) {
const blobCount = 2
numberOfColumns := params.BeaconConfig().NumberOfColumns
// Generate test blobs
_, roBlobSidecars := util.GenerateTestElectraBlockWithSidecar(t, [fieldparams.RootLength]byte{}, 42, blobCount)
// Extract blobs and compute expected cells and proofs
blobs := make([][]byte, blobCount)
expectedCellsAndProofs := make([]kzg.CellsAndProofs, blobCount)
var wg errgroup.Group
for i := range blobCount {
blob := roBlobSidecars[i].Blob
blobs[i] = blob
wg.Go(func() error {
var kzgBlob kzg.Blob
count := copy(kzgBlob[:], blob)
require.Equal(t, len(kzgBlob), count)
cp, err := kzg.ComputeCellsAndKZGProofs(&kzgBlob)
if err != nil {
return errors.Wrapf(err, "compute cells and kzg proofs for blob %d", i)
}
expectedCellsAndProofs[i] = cp
return nil
})
}
err := wg.Wait()
require.NoError(t, err)
// Flatten proofs
cellProofs := make([][]byte, 0, blobCount*numberOfColumns)
for _, cp := range expectedCellsAndProofs {
for _, proof := range cp.Proofs {
cellProofs = append(cellProofs, proof[:])
}
}
// Test ComputeCellsAndProofs
actualCellsAndProofs, err := peerdas.ComputeCellsAndProofs(blobs, cellProofs)
require.NoError(t, err)
require.Equal(t, blobCount, len(actualCellsAndProofs))
// Verify the results match expected
for i := range blobCount {
require.Equal(t, len(expectedCellsAndProofs[i].Cells), len(actualCellsAndProofs[i].Cells))
require.Equal(t, len(expectedCellsAndProofs[i].Proofs), len(actualCellsAndProofs[i].Proofs))
// Compare cells
for j, expectedCell := range expectedCellsAndProofs[i].Cells {
require.Equal(t, expectedCell, actualCellsAndProofs[i].Cells[j])
}
// Compare proofs
for j, expectedProof := range expectedCellsAndProofs[i].Proofs {
require.Equal(t, expectedProof, actualCellsAndProofs[i].Proofs[j])
}
}
})
}

View File

@@ -1,12 +1,24 @@
package peerdas
import (
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
beaconState "github.com/OffchainLabs/prysm/v6/beacon-chain/state"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/pkg/errors"
)
var (
ErrNilSignedBlockOrEmptyCellsAndProofs = errors.New("nil signed block or empty cells and proofs")
ErrSizeMismatch = errors.New("mismatch in the number of blob KZG commitments and cellsAndProofs")
ErrNotEnoughDataColumnSidecars = errors.New("not enough columns")
ErrDataColumnSidecarsNotSortedByIndex = errors.New("data column sidecars are not sorted by index")
)
// ValidatorsCustodyRequirement returns the number of custody groups regarding the validator indices attached to the beacon node.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#validator-custody
func ValidatorsCustodyRequirement(state beaconState.ReadOnlyBeaconState, validatorsIndex map[primitives.ValidatorIndex]bool) (uint64, error) {
@@ -28,3 +40,154 @@ func ValidatorsCustodyRequirement(state beaconState.ReadOnlyBeaconState, validat
count := totalNodeBalance / balancePerAdditionalCustodyGroup
return min(max(count, validatorCustodyRequirement), numberOfCustodyGroups), nil
}
// ConstructDataColumnSidecar, given ConstructionPopulator and the cells/proofs associated with each blob in the
// block, assembles sidecars which can be distributed to peers.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#get_data_column_sidecars_from_block
func ConstructDataColumnSidecar(rows []kzg.CellsAndProofs, src ConstructionPopulator) ([]blocks.RODataColumn, error) {
if len(rows) == 0 {
return nil, nil
}
start := time.Now()
cells, proofs, err := rotateRowsToCols(rows, params.BeaconConfig().NumberOfColumns)
if err != nil {
return nil, errors.Wrap(err, "rotate cells and proofs")
}
maxIdx := params.BeaconConfig().NumberOfColumns
roSidecars := make([]blocks.RODataColumn, 0, maxIdx)
for idx := range maxIdx {
sidecar := &ethpb.DataColumnSidecar{
Index: idx,
Column: cells[idx],
KzgProofs: proofs[idx],
}
if err := src.Populate(sidecar); err != nil {
return nil, errors.Wrap(err, "column field setter set")
}
if len(sidecar.KzgCommitments) != len(sidecar.Column) || len(sidecar.KzgCommitments) != len(sidecar.KzgProofs) {
return nil, ErrSizeMismatch
}
roSidecar, err := blocks.NewRODataColumnWithRoot(sidecar, src.Root())
if err != nil {
return nil, errors.Wrap(err, "new ro data column")
}
roSidecars = append(roSidecars, roSidecar)
}
dataColumnComputationTime.Observe(float64(time.Since(start).Milliseconds()))
return roSidecars, nil
}
// rotateRowsToCols takes a 2D slice of cells and proofs, where the x is rows (blobs) and y is columns,
// and returns a 2D slice where x is columns and y is rows.
func rotateRowsToCols(rows []kzg.CellsAndProofs, numCols uint64) ([][][]byte, [][][]byte, error) {
if len(rows) == 0 {
return nil, nil, nil
}
cellCols := make([][][]byte, numCols)
proofCols := make([][][]byte, numCols)
for i, cp := range rows {
if uint64(len(cp.Cells)) != numCols {
return nil, nil, errors.Wrap(ErrNotEnoughDataColumnSidecars, "not enough cells")
}
if len(cp.Cells) != len(cp.Proofs) {
return nil, nil, errors.Wrap(ErrNotEnoughDataColumnSidecars, "not enough proofs")
}
for j := uint64(0); j < numCols; j++ {
if i == 0 {
cellCols[j] = make([][]byte, len(rows))
proofCols[j] = make([][]byte, len(rows))
}
cellCols[j][i] = cp.Cells[j][:]
proofCols[j][i] = cp.Proofs[j][:]
}
}
return cellCols, proofCols, nil
}
// ConstructionPopulator is an interface that can be satisfied by a type that can use data from a struct
// like a DataColumnSidecar or a BeaconBlock to set the fields in a data column sidecar that cannot
// be obtained from the engine api.
type ConstructionPopulator interface {
Populate(*ethpb.DataColumnSidecar) error
Slot() primitives.Slot
Root() [32]byte
Commitments() [][]byte
Type() string
}
func PopulateFromSidecar(sidecar blocks.RODataColumn) *SidecarReconstructionSource {
return &SidecarReconstructionSource{RODataColumn: sidecar}
}
type SidecarReconstructionSource struct {
blocks.RODataColumn
}
func (s *SidecarReconstructionSource) Populate(dc *ethpb.DataColumnSidecar) error {
dc.SignedBlockHeader = s.SignedBlockHeader
dc.KzgCommitments = s.KzgCommitments
dc.KzgCommitmentsInclusionProof = s.KzgCommitmentsInclusionProof
return nil
}
func (s *SidecarReconstructionSource) Root() [32]byte {
return s.BlockRoot()
}
func (s *SidecarReconstructionSource) Commitments() [][]byte {
return s.KzgCommitments
}
func (s *SidecarReconstructionSource) Type() string {
return "DataColumnSidecar"
}
var _ ConstructionPopulator = (*SidecarReconstructionSource)(nil)
func PopulateFromBlock(block blocks.ROBlock) *BlockReconstructionSource {
return &BlockReconstructionSource{ROBlock: block}
}
type BlockReconstructionSource struct {
blocks.ROBlock
}
func (b *BlockReconstructionSource) Populate(dc *ethpb.DataColumnSidecar) error {
block := b.Block()
blockBody := block.Body()
blobKzgCommitments, err := blockBody.BlobKzgCommitments()
if err != nil {
return errors.Wrap(err, "blob KZG commitments")
}
dc.KzgCommitments = blobKzgCommitments
dc.SignedBlockHeader, err = b.Header()
if err != nil {
return errors.Wrap(err, "signed block header")
}
dc.KzgCommitmentsInclusionProof, err = blocks.MerkleProofKZGCommitments(blockBody)
if err != nil {
return errors.Wrap(err, "merkle proof KZG commitments")
}
return nil
}
func (s *BlockReconstructionSource) Slot() primitives.Slot {
return s.Block().Slot()
}
func (s *BlockReconstructionSource) Commitments() [][]byte {
c, err := s.Block().Body().BlobKzgCommitments()
if err != nil {
log.WithField("root", s.Root()).Trace("Unable to get kzg commitments from block")
}
return c
}
func (s *BlockReconstructionSource) Type() string {
return "BeaconBlock"
}
var _ ConstructionPopulator = (*BlockReconstructionSource)(nil)

View File

@@ -3,11 +3,15 @@ package peerdas_test
import (
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
state_native "github.com/OffchainLabs/prysm/v6/beacon-chain/state/state-native"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
)
func TestValidatorsCustodyRequirement(t *testing.T) {
@@ -53,3 +57,248 @@ func TestValidatorsCustodyRequirement(t *testing.T) {
})
}
}
func TestDataColumnSidecarsFromBlock(t *testing.T) {
t.Run("sizes mismatch", func(t *testing.T) {
// Create a protobuf signed beacon block.
signedBeaconBlockPb := util.NewBeaconBlockDeneb()
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
// Create cells and proofs.
cellsAndProofs := []kzg.CellsAndProofs{
{
Cells: make([]kzg.Cell, params.BeaconConfig().NumberOfColumns),
Proofs: make([]kzg.Proof, params.BeaconConfig().NumberOfColumns),
},
}
rob, err := blocks.NewROBlock(signedBeaconBlock)
require.NoError(t, err)
_, err = peerdas.ConstructDataColumnSidecar(cellsAndProofs, peerdas.PopulateFromBlock(rob))
require.ErrorIs(t, err, peerdas.ErrSizeMismatch)
})
t.Run("cells array too short for column index", func(t *testing.T) {
// Create a Fulu block with a blob commitment.
signedBeaconBlockPb := util.NewBeaconBlockFulu()
signedBeaconBlockPb.Block.Body.BlobKzgCommitments = [][]byte{make([]byte, 48)}
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
// Create cells and proofs with insufficient cells for the number of columns.
// This simulates a scenario where cellsAndProofs has fewer cells than expected columns.
cellsAndProofs := []kzg.CellsAndProofs{
{
Cells: make([]kzg.Cell, 10), // Only 10 cells
Proofs: make([]kzg.Proof, 10), // Only 10 proofs
},
}
// This should fail because the function will try to access columns up to NumberOfColumns
// but we only have 10 cells/proofs.
rob, err := blocks.NewROBlock(signedBeaconBlock)
require.NoError(t, err)
_, err = peerdas.ConstructDataColumnSidecar(cellsAndProofs, peerdas.PopulateFromBlock(rob))
require.ErrorIs(t, err, peerdas.ErrNotEnoughDataColumnSidecars)
})
t.Run("proofs array too short for column index", func(t *testing.T) {
// Create a Fulu block with a blob commitment.
signedBeaconBlockPb := util.NewBeaconBlockFulu()
signedBeaconBlockPb.Block.Body.BlobKzgCommitments = [][]byte{make([]byte, 48)}
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
// Create cells and proofs with sufficient cells but insufficient proofs.
numberOfColumns := params.BeaconConfig().NumberOfColumns
cellsAndProofs := []kzg.CellsAndProofs{
{
Cells: make([]kzg.Cell, numberOfColumns),
Proofs: make([]kzg.Proof, 5), // Only 5 proofs, less than columns
},
}
// This should fail when trying to access proof beyond index 4.
rob, err := blocks.NewROBlock(signedBeaconBlock)
require.NoError(t, err)
_, err = peerdas.ConstructDataColumnSidecar(cellsAndProofs, peerdas.PopulateFromBlock(rob))
require.ErrorIs(t, err, peerdas.ErrNotEnoughDataColumnSidecars)
require.ErrorContains(t, "not enough proofs", err)
})
t.Run("nominal", func(t *testing.T) {
// Create a Fulu block with blob commitments.
signedBeaconBlockPb := util.NewBeaconBlockFulu()
commitment1 := make([]byte, 48)
commitment2 := make([]byte, 48)
// Set different values to distinguish commitments
commitment1[0] = 0x01
commitment2[0] = 0x02
signedBeaconBlockPb.Block.Body.BlobKzgCommitments = [][]byte{commitment1, commitment2}
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
// Create cells and proofs with correct dimensions.
numberOfColumns := params.BeaconConfig().NumberOfColumns
cellsAndProofs := []kzg.CellsAndProofs{
{
Cells: make([]kzg.Cell, numberOfColumns),
Proofs: make([]kzg.Proof, numberOfColumns),
},
{
Cells: make([]kzg.Cell, numberOfColumns),
Proofs: make([]kzg.Proof, numberOfColumns),
},
}
// Set distinct values in cells and proofs for testing
for i := range numberOfColumns {
cellsAndProofs[0].Cells[i][0] = byte(i)
cellsAndProofs[0].Proofs[i][0] = byte(i)
cellsAndProofs[1].Cells[i][0] = byte(i + 128)
cellsAndProofs[1].Proofs[i][0] = byte(i + 128)
}
rob, err := blocks.NewROBlock(signedBeaconBlock)
require.NoError(t, err)
sidecars, err := peerdas.ConstructDataColumnSidecar(cellsAndProofs, peerdas.PopulateFromBlock(rob))
require.NoError(t, err)
require.NotNil(t, sidecars)
require.Equal(t, int(numberOfColumns), len(sidecars))
// Verify each sidecar has the expected structure
for i, sidecar := range sidecars {
require.Equal(t, uint64(i), sidecar.Index)
require.Equal(t, 2, len(sidecar.Column))
require.Equal(t, 2, len(sidecar.KzgCommitments))
require.Equal(t, 2, len(sidecar.KzgProofs))
// Verify commitments match what we set
require.DeepEqual(t, commitment1, sidecar.KzgCommitments[0])
require.DeepEqual(t, commitment2, sidecar.KzgCommitments[1])
// Verify column data comes from the correct cells
require.Equal(t, byte(i), sidecar.Column[0][0])
require.Equal(t, byte(i+128), sidecar.Column[1][0])
// Verify proofs come from the correct proofs
require.Equal(t, byte(i), sidecar.KzgProofs[0][0])
require.Equal(t, byte(i+128), sidecar.KzgProofs[1][0])
}
})
}
func TestDataColumnSidecarsFromColumnSidecar(t *testing.T) {
// Create KZG commitments for 2 blobs
commitment1 := make([]byte, 48)
commitment2 := make([]byte, 48)
commitment1[0] = 0x01
commitment2[0] = 0x02
kzgCommitments := [][]byte{commitment1, commitment2}
// Create column data for 2 blobs
columnData := make([][]byte, 2)
columnData[0] = make([]byte, kzg.BytesPerCell)
columnData[1] = make([]byte, kzg.BytesPerCell)
columnData[0][0] = 0x11 // Distinct values
columnData[1][0] = 0x22
// Create KZG proofs for 2 blobs
kzgProofs := make([][]byte, 2)
kzgProofs[0] = make([]byte, 48)
kzgProofs[1] = make([]byte, 48)
kzgProofs[0][0] = 0x33
kzgProofs[1][0] = 0x44
// Create inclusion proof
inclusionProof := make([][]byte, 4)
for i := range inclusionProof {
inclusionProof[i] = make([]byte, 32)
inclusionProof[i][0] = byte(i + 0x50)
}
// Create the input VerifiedRODataColumn sidecar using test utility
_, verifiedSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{
{
Index: 5, // Column index 5
Column: columnData,
KzgCommitments: kzgCommitments,
KzgProofs: kzgProofs,
KzgCommitmentsInclusionProof: inclusionProof,
Slot: 42,
ProposerIndex: 7,
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
})
require.Equal(t, 1, len(verifiedSidecars))
verifiedInputSidecar := verifiedSidecars[0]
// Create cells and proofs with correct dimensions for 2 blobs
numberOfColumns := params.BeaconConfig().NumberOfColumns
cellsAndProofs := []kzg.CellsAndProofs{
{
Cells: make([]kzg.Cell, numberOfColumns),
Proofs: make([]kzg.Proof, numberOfColumns),
},
{
Cells: make([]kzg.Cell, numberOfColumns),
Proofs: make([]kzg.Proof, numberOfColumns),
},
}
// Set distinct values in cells and proofs for testing
for i := range numberOfColumns {
cellsAndProofs[0].Cells[i][0] = byte(i)
cellsAndProofs[0].Proofs[i][0] = byte(i + 10)
cellsAndProofs[1].Cells[i][0] = byte(i + 128)
cellsAndProofs[1].Proofs[i][0] = byte(i + 138)
}
// Call the function
sidecars, err := peerdas.ConstructDataColumnSidecar(cellsAndProofs, peerdas.PopulateFromSidecar(verifiedInputSidecar.RODataColumn))
require.NoError(t, err)
require.NotNil(t, sidecars)
require.Equal(t, int(numberOfColumns), len(sidecars))
// Verify each sidecar has the expected structure
for i, sidecar := range sidecars {
require.Equal(t, uint64(i), sidecar.Index)
require.Equal(t, 2, len(sidecar.Column))
require.Equal(t, 2, len(sidecar.KzgCommitments))
require.Equal(t, 2, len(sidecar.KzgProofs))
// Verify commitments match input
require.DeepEqual(t, commitment1, sidecar.KzgCommitments[0])
require.DeepEqual(t, commitment2, sidecar.KzgCommitments[1])
// Verify column data comes from the correct cells
require.Equal(t, byte(i), sidecar.Column[0][0])
require.Equal(t, byte(i+128), sidecar.Column[1][0])
// Verify proofs come from the correct proofs
require.Equal(t, byte(i+10), sidecar.KzgProofs[0][0])
require.Equal(t, byte(i+138), sidecar.KzgProofs[1][0])
// Verify inclusion proof is preserved
require.Equal(t, len(inclusionProof), len(sidecar.KzgCommitmentsInclusionProof))
for j, proof := range sidecar.KzgCommitmentsInclusionProof {
require.Equal(t, byte(j+0x50), proof[0])
}
// Verify signed block header is preserved
require.Equal(t, primitives.Slot(42), sidecar.SignedBlockHeader.Header.Slot)
require.Equal(t, primitives.ValidatorIndex(7), sidecar.SignedBlockHeader.Header.ProposerIndex)
}
}

View File

@@ -25,6 +25,7 @@ go_library(
"//testing/spectest:__subpackages__",
],
deps = [
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/cache/depositsnapshot:go_default_library",
"//beacon-chain/core/altair:go_default_library",
@@ -103,6 +104,7 @@ go_test(
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",

View File

@@ -7,6 +7,7 @@ import (
"strings"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/execution/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
@@ -123,7 +124,7 @@ type Reconstructor interface {
ctx context.Context, blindedBlocks []interfaces.ReadOnlySignedBeaconBlock,
) ([]interfaces.SignedBeaconBlock, error)
ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [fieldparams.RootLength]byte, hi func(uint64) bool) ([]blocks.VerifiedROBlob, error)
ReconstructDataColumnSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [fieldparams.RootLength]byte) ([]blocks.VerifiedRODataColumn, error)
ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error)
}
// EngineCaller defines a client that can interact with an Ethereum
@@ -651,22 +652,41 @@ func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.
return verifiedBlobs, nil
}
// ReconstructDataColumnSidecars reconstructs the verified data column sidecars for a given beacon block.
// It retrieves the KZG commitments from the block body, fetches the associated blobs and cell proofs from the EL,
// and constructs the corresponding verified read-only data column sidecars.
func (s *Service) ReconstructDataColumnSidecars(ctx context.Context, signedROBlock interfaces.ReadOnlySignedBeaconBlock, blockRoot [fieldparams.RootLength]byte) ([]blocks.VerifiedRODataColumn, error) {
block := signedROBlock.Block()
func (s *Service) ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error) {
root := populator.Root()
log := log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", blockRoot),
"slot": block.Slot(),
"root": fmt.Sprintf("%#x", root),
"slot": populator.Slot(),
})
kzgCommitments, err := block.Body().BlobKzgCommitments()
// Fetch cells and proofs from the execution client using the KZG commitments from the sidecar.
cellsAndProofs, err := s.cellsAndProofsForCommitments(ctx, populator.Commitments())
if err != nil {
return nil, wrapWithBlockRoot(err, blockRoot, "blob KZG commitments")
return nil, wrapWithBlockRoot(err, root, "fetch cells and proofs from execution client")
}
// Return early if nothing is returned from the EL.
if len(cellsAndProofs) == 0 {
return nil, nil
}
// Construct data column sidears from the signed block and cells and proofs.
roSidecars, err := peerdas.ConstructDataColumnSidecar(cellsAndProofs, populator)
if err != nil {
return nil, wrapWithBlockRoot(err, populator.Root(), "data column sidcars from column sidecar")
}
// Upgrade the sidecars to verified sidecars.
// We trust the execution layer we are connected to, so we can upgrade the sidecar into a verified one.
verifiedROSidecars := upgradeSidecarsToVerifiedSidecars(roSidecars)
log.WithField("sourceType", populator.Type()).Debug("Data columns sidecars constructed from the execution client")
return verifiedROSidecars, nil
}
// cellsAndProofsForCommitments fetches cells and proofs from the execution client (using engine_getBlobsV2 execution API method)
func (s *Service) cellsAndProofsForCommitments(ctx context.Context, kzgCommitments [][]byte) ([]kzg.CellsAndProofs, error) {
// Collect KZG hashes for all blobs.
versionedHashes := make([]common.Hash, 0, len(kzgCommitments))
for _, commitment := range kzgCommitments {
@@ -677,12 +697,11 @@ func (s *Service) ReconstructDataColumnSidecars(ctx context.Context, signedROBlo
// Fetch all blobsAndCellsProofs from the execution client.
blobAndProofV2s, err := s.GetBlobsV2(ctx, versionedHashes)
if err != nil {
return nil, wrapWithBlockRoot(err, blockRoot, "get blobs V2")
return nil, errors.Wrapf(err, "get blobs V2")
}
// Return early if nothing is returned from the EL.
if len(blobAndProofV2s) == 0 {
log.Debug("No blobs returned from execution client")
return nil, nil
}
@@ -690,34 +709,31 @@ func (s *Service) ReconstructDataColumnSidecars(ctx context.Context, signedROBlo
blobs, cellProofs := make([][]byte, 0, len(blobAndProofV2s)), make([][]byte, 0, len(blobAndProofV2s))
for _, blobsAndProofs := range blobAndProofV2s {
if blobsAndProofs == nil {
return nil, wrapWithBlockRoot(errMissingBlobsAndProofsFromEL, blockRoot, "")
return nil, errMissingBlobsAndProofsFromEL
}
blobs, cellProofs = append(blobs, blobsAndProofs.Blob), append(cellProofs, blobsAndProofs.KzgProofs...)
blobs = append(blobs, blobsAndProofs.Blob)
cellProofs = append(cellProofs, blobsAndProofs.KzgProofs...)
}
// Construct the data column sidcars from the blobs and cell proofs provided by the execution client.
dataColumnSidecars, err := peerdas.ConstructDataColumnSidecars(signedROBlock, blobs, cellProofs)
// Compute cells and proofs from the blobs and cell proofs.
cellsAndProofs, err := peerdas.ComputeCellsAndProofs(blobs, cellProofs)
if err != nil {
return nil, wrapWithBlockRoot(err, blockRoot, "construct data column sidecars")
return nil, errors.Wrap(err, "compute cells and proofs")
}
// Finally, construct verified RO data column sidecars.
// We trust the execution layer we are connected to, so we can upgrade the read only data column sidecar into a verified one.
verifiedRODataColumns := make([]blocks.VerifiedRODataColumn, 0, len(dataColumnSidecars))
for _, dataColumnSidecar := range dataColumnSidecars {
roDataColumn, err := blocks.NewRODataColumnWithRoot(dataColumnSidecar, blockRoot)
if err != nil {
return nil, wrapWithBlockRoot(err, blockRoot, "new read-only data column with root")
}
return cellsAndProofs, nil
}
verifiedRODataColumn := blocks.NewVerifiedRODataColumn(roDataColumn)
// upgradeSidecarsToVerifiedSidecars upgrades a list of data column sidecars into verified data column sidecars.
func upgradeSidecarsToVerifiedSidecars(roSidecars []blocks.RODataColumn) []blocks.VerifiedRODataColumn {
verifiedRODataColumns := make([]blocks.VerifiedRODataColumn, 0, len(roSidecars))
for _, roSidecar := range roSidecars {
verifiedRODataColumn := blocks.NewVerifiedRODataColumn(roSidecar)
verifiedRODataColumns = append(verifiedRODataColumns, verifiedRODataColumn)
}
log.Debug("Data columns successfully reconstructed from the execution client")
return verifiedRODataColumns, nil
return verifiedRODataColumns
}
func fullPayloadFromPayloadBody(
@@ -1009,6 +1025,6 @@ func toBlockNumArg(number *big.Int) string {
}
// wrapWithBlockRoot returns a new error with the given block root.
func wrapWithBlockRoot(err error, blockRoot [32]byte, message string) error {
func wrapWithBlockRoot(err error, blockRoot [fieldparams.RootLength]byte, message string) error {
return errors.Wrap(err, fmt.Sprintf("%s for block %#x", message, blockRoot))
}

View File

@@ -14,6 +14,7 @@ import (
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
mocks "github.com/OffchainLabs/prysm/v6/beacon-chain/execution/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
@@ -2556,7 +2557,7 @@ func TestReconstructBlobSidecars(t *testing.T) {
})
}
func TestReconstructDataColumnSidecars(t *testing.T) {
func TestConstructDataColumnSidecarsFromBlock(t *testing.T) {
// Start the trusted setup.
err := kzg.Start()
require.NoError(t, err)
@@ -2580,11 +2581,14 @@ func TestReconstructDataColumnSidecars(t *testing.T) {
sb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
roBlock, err := blocks.NewROBlockWithRoot(sb, r)
require.NoError(t, err)
ctx := context.Background()
t.Run("GetBlobsV2 is not supported", func(t *testing.T) {
_, err := client.ReconstructDataColumnSidecars(ctx, sb, r)
require.ErrorContains(t, "get blobs V2 for block", err)
_, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.ErrorContains(t, "engine_getBlobsV2 is not supported", err)
})
t.Run("nothing received", func(t *testing.T) {
@@ -2594,7 +2598,7 @@ func TestReconstructDataColumnSidecars(t *testing.T) {
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
dataColumns, err := client.ReconstructDataColumnSidecars(ctx, sb, r)
dataColumns, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
require.Equal(t, 0, len(dataColumns))
})
@@ -2607,7 +2611,7 @@ func TestReconstructDataColumnSidecars(t *testing.T) {
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
dataColumns, err := client.ReconstructDataColumnSidecars(ctx, sb, r)
dataColumns, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
require.Equal(t, 128, len(dataColumns))
})
@@ -2620,10 +2624,134 @@ func TestReconstructDataColumnSidecars(t *testing.T) {
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
dataColumns, err := client.ReconstructDataColumnSidecars(ctx, sb, r)
_, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.ErrorContains(t, errMissingBlobsAndProofsFromEL.Error(), err)
})
}
func TestConstructDataColumnSidecarsFromSidecar(t *testing.T) {
// Start the trusted setup.
err := kzg.Start()
require.NoError(t, err)
// Setup right fork epoch
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig().Copy()
cfg.CapellaForkEpoch = 1
cfg.DenebForkEpoch = 2
cfg.ElectraForkEpoch = 3
cfg.FuluForkEpoch = 4
params.OverrideBeaconConfig(cfg)
client := &Service{capabilityCache: &capabilityCache{}}
// Create KZG commitments for the sidecar
kzgCommitments := createRandomKzgCommitments(t, 3)
// Create a test verified data column sidecar
columnData := make([][]byte, 3)
for i := range columnData {
columnData[i] = make([]byte, kzg.BytesPerCell)
columnData[i][0] = byte(i + 0x10)
}
kzgProofs := make([][]byte, 3)
for i := range kzgProofs {
kzgProofs[i] = make([]byte, 48)
kzgProofs[i][0] = byte(i + 0x20)
}
inclusionProof := make([][]byte, 4)
for i := range inclusionProof {
inclusionProof[i] = make([]byte, 32)
inclusionProof[i][0] = byte(i + 0x30)
}
// Create the input VerifiedRODataColumn sidecar using test utility
_, verifiedSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{
{
Index: 7, // Column index 7
Column: columnData,
KzgCommitments: kzgCommitments,
KzgProofs: kzgProofs,
KzgCommitmentsInclusionProof: inclusionProof,
Slot: 4 * params.BeaconConfig().SlotsPerEpoch, // Fulu epoch
ProposerIndex: 9,
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
})
require.Equal(t, 1, len(verifiedSidecars))
sidecar := verifiedSidecars[0]
ctx := context.Background()
t.Run("GetBlobsV2 is not supported", func(t *testing.T) {
_, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromSidecar(sidecar.RODataColumn))
require.NotNil(t, err)
require.ErrorContains(t, "engine_getBlobsV2 is not supported", err)
})
t.Run("nothing received", func(t *testing.T) {
srv := createBlobServerV2(t, 0, []bool{})
defer srv.Close()
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
dataColumns, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromSidecar(sidecar.RODataColumn))
require.NoError(t, err)
require.Equal(t, 0, len(dataColumns))
})
t.Run("receiving all blobs", func(t *testing.T) {
blobMasks := []bool{true, true, true}
srv := createBlobServerV2(t, 3, blobMasks)
defer srv.Close()
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
dataColumns, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromSidecar(sidecar.RODataColumn))
require.NoError(t, err)
require.Equal(t, 128, len(dataColumns)) // NumberOfColumns
// Verify each sidecar has expected structure
for i, sidecar := range dataColumns {
require.Equal(t, uint64(i), sidecar.Index)
require.Equal(t, 3, len(sidecar.Column)) // 3 blobs
require.Equal(t, 3, len(sidecar.KzgCommitments)) // 3 commitments
require.Equal(t, 3, len(sidecar.KzgProofs)) // 3 proofs per column
// Verify commitments are preserved
for j, commitment := range sidecar.KzgCommitments {
require.DeepEqual(t, kzgCommitments[j], commitment)
}
// Verify inclusion proof is preserved
require.Equal(t, len(inclusionProof), len(sidecar.KzgCommitmentsInclusionProof))
for j, proof := range sidecar.KzgCommitmentsInclusionProof {
require.Equal(t, byte(j+0x30), proof[0])
}
// Verify signed block header is preserved
require.Equal(t, primitives.Slot(4*params.BeaconConfig().SlotsPerEpoch), sidecar.SignedBlockHeader.Header.Slot)
require.Equal(t, primitives.ValidatorIndex(9), sidecar.SignedBlockHeader.Header.ProposerIndex)
}
})
t.Run("missing some blobs", func(t *testing.T) {
blobMasks := []bool{false, true, true}
srv := createBlobServerV2(t, 3, blobMasks)
defer srv.Close()
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
_, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromSidecar(sidecar.RODataColumn))
require.ErrorContains(t, errMissingBlobsAndProofsFromEL.Error(), err)
})
}
func createRandomKzgCommitments(t *testing.T, num int) [][]byte {

View File

@@ -14,6 +14,7 @@ go_library(
],
deps = [
"//async/event:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/execution/types:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",

View File

@@ -4,6 +4,7 @@ import (
"context"
"math/big"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
@@ -116,7 +117,8 @@ func (e *EngineClient) ReconstructBlobSidecars(context.Context, interfaces.ReadO
return e.BlobSidecars, e.ErrorBlobSidecars
}
func (e *EngineClient) ReconstructDataColumnSidecars(context.Context, interfaces.ReadOnlySignedBeaconBlock, [fieldparams.RootLength]byte) ([]blocks.VerifiedRODataColumn, error) {
// ConstructDataColumnSidecars is a mock implementation of the ConstructDataColumnSidecars method.
func (e *EngineClient) ConstructDataColumnSidecars(context.Context, peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error) {
return e.DataColumnSidecars, e.ErrorDataColumnSidecars
}

View File

@@ -60,6 +60,7 @@ go_library(
"//config/features:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//consensus-types/wrapper:go_default_library",

View File

@@ -11,6 +11,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/crypto/hash"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing"
@@ -308,19 +309,13 @@ func (s *Service) BroadcastLightClientFinalityUpdate(ctx context.Context, update
// BroadcastDataColumnSidecar broadcasts a data column to the p2p network, the message is assumed to be
// broadcasted to the current fork and to the input column subnet.
func (s *Service) BroadcastDataColumnSidecar(
root [fieldparams.RootLength]byte,
dataColumnSubnet uint64,
dataColumnSidecar *ethpb.DataColumnSidecar,
dataColumnSidecar blocks.VerifiedRODataColumn,
) error {
// Add tracing to the function.
ctx, span := trace.StartSpan(s.ctx, "p2p.BroadcastDataColumnSidecar")
defer span.End()
// Ensure the data column sidecar is not nil.
if dataColumnSidecar == nil {
return errors.Errorf("attempted to broadcast nil data column sidecar at subnet %d", dataColumnSubnet)
}
// Retrieve the current fork digest.
forkDigest, err := s.currentForkDigest()
if err != nil {
@@ -330,16 +325,15 @@ func (s *Service) BroadcastDataColumnSidecar(
}
// Non-blocking broadcast, with attempts to discover a column subnet peer if none available.
go s.internalBroadcastDataColumnSidecar(ctx, root, dataColumnSubnet, dataColumnSidecar, forkDigest)
go s.internalBroadcastDataColumnSidecar(ctx, dataColumnSubnet, dataColumnSidecar, forkDigest)
return nil
}
func (s *Service) internalBroadcastDataColumnSidecar(
ctx context.Context,
root [fieldparams.RootLength]byte,
columnSubnet uint64,
dataColumnSidecar *ethpb.DataColumnSidecar,
dataColumnSidecar blocks.VerifiedRODataColumn,
forkDigest [fieldparams.VersionLength]byte,
) {
// Add tracing to the function.
@@ -385,7 +379,7 @@ func (s *Service) internalBroadcastDataColumnSidecar(
log.WithFields(logrus.Fields{
"slot": slot,
"timeSinceSlotStart": time.Since(slotStartTime),
"root": fmt.Sprintf("%#x", root),
"root": fmt.Sprintf("%#x", dataColumnSidecar.BlockRoot()),
"columnSubnet": columnSubnet,
}).Debug("Broadcasted data column sidecar")

View File

@@ -711,13 +711,8 @@ func TestService_BroadcastDataColumn(t *testing.T) {
subnet := peerdas.ComputeSubnetForDataColumnSidecar(columnIndex)
topic := fmt.Sprintf(topicFormat, digest, subnet) + service.Encoding().ProtocolSuffix()
roSidecars, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: columnIndex}})
sidecar := roSidecars[0].DataColumnSidecar
// Attempt to broadcast nil object should fail.
var emptyRoot [fieldparams.RootLength]byte
err = service.BroadcastDataColumnSidecar(emptyRoot, subnet, nil)
require.ErrorContains(t, "attempted to broadcast nil", err)
_, verifiedRoSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: columnIndex}})
verifiedRoSidecar := verifiedRoSidecars[0]
// Subscribe to the topic.
sub, err := p2.SubscribeToTopic(topic)
@@ -727,7 +722,7 @@ func TestService_BroadcastDataColumn(t *testing.T) {
time.Sleep(50 * time.Millisecond)
// Broadcast to peers and wait.
err = service.BroadcastDataColumnSidecar(emptyRoot, subnet, sidecar)
err = service.BroadcastDataColumnSidecar(subnet, verifiedRoSidecar)
require.NoError(t, err)
// Receive the message.
@@ -739,5 +734,5 @@ func TestService_BroadcastDataColumn(t *testing.T) {
var result ethpb.DataColumnSidecar
require.NoError(t, service.Encoding().DecodeGossip(msg.Data, &result))
require.DeepEqual(t, &result, sidecar)
require.DeepEqual(t, &result, verifiedRoSidecar)
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
@@ -51,7 +52,7 @@ type (
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
BroadcastLightClientOptimisticUpdate(ctx context.Context, update interfaces.LightClientOptimisticUpdate) error
BroadcastLightClientFinalityUpdate(ctx context.Context, update interfaces.LightClientFinalityUpdate) error
BroadcastDataColumnSidecar(root [fieldparams.RootLength]byte, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar) error
BroadcastDataColumnSidecar(columnSubnet uint64, dataColumnSidecar blocks.VerifiedRODataColumn) error
}
// SetStreamHandler configures p2p to handle streams of a certain topic ID.

View File

@@ -25,6 +25,7 @@ go_library(
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",

View File

@@ -6,6 +6,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
@@ -168,7 +169,7 @@ func (*FakeP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interfac
}
// BroadcastDataColumnSidecar -- fake.
func (*FakeP2P) BroadcastDataColumnSidecar(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar) error {
func (*FakeP2P) BroadcastDataColumnSidecar(_ uint64, _ blocks.VerifiedRODataColumn) error {
return nil
}

View File

@@ -5,7 +5,7 @@ import (
"sync"
"sync/atomic"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"google.golang.org/protobuf/proto"
@@ -63,7 +63,7 @@ func (m *MockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
}
// BroadcastDataColumnSidecar broadcasts a data column for mock.
func (m *MockBroadcaster) BroadcastDataColumnSidecar([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar) error {
func (m *MockBroadcaster) BroadcastDataColumnSidecar(uint64, blocks.VerifiedRODataColumn) error {
m.BroadcastCalled.Store(true)
return nil
}

View File

@@ -17,6 +17,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
@@ -231,7 +232,7 @@ func (p *TestP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interf
}
// BroadcastDataColumnSidecar broadcasts a data column for mock.
func (p *TestP2P) BroadcastDataColumnSidecar([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar) error {
func (p *TestP2P) BroadcastDataColumnSidecar(uint64, blocks.VerifiedRODataColumn) error {
p.BroadcastCalled.Store(true)
return nil
}

View File

@@ -220,16 +220,13 @@ func TestGetBlob(t *testing.T) {
cellsAndProofsList = append(cellsAndProofsList, cellsAndProogs)
}
dataColumnSidecarPb, err := peerdas.DataColumnSidecars(fuluBlock, cellsAndProofsList)
roDataColumnSidecars, err := peerdas.ConstructDataColumnSidecar(cellsAndProofsList, peerdas.PopulateFromBlock(fuluBlock))
require.NoError(t, err)
verifiedRoDataColumnSidecars := make([]blocks.VerifiedRODataColumn, 0, len(dataColumnSidecarPb))
for _, sidecarPb := range dataColumnSidecarPb {
roDataColumn, err := blocks.NewRODataColumnWithRoot(sidecarPb, fuluBlockRoot)
require.NoError(t, err)
verifiedRoDataColumn := blocks.NewVerifiedRODataColumn(roDataColumn)
verifiedRoDataColumnSidecars = append(verifiedRoDataColumnSidecars, verifiedRoDataColumn)
verifiedRoDataColumnSidecars := make([]blocks.VerifiedRODataColumn, 0, len(roDataColumnSidecars))
for _, roDataColumnSidecar := range roDataColumnSidecars {
verifiedRoDataColumnSidecar := blocks.NewVerifiedRODataColumn(roDataColumnSidecar)
verifiedRoDataColumnSidecars = append(verifiedRoDataColumnSidecars, verifiedRoDataColumnSidecar)
}
err = db.SaveBlock(t.Context(), fuluBlock)

View File

@@ -281,7 +281,7 @@ func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.Signed
func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSignedBeaconBlock) (*ethpb.ProposeResponse, error) {
var (
blobSidecars []*ethpb.BlobSidecar
dataColumnSidecars []*ethpb.DataColumnSidecar
dataColumnSidecars []blocks.RODataColumn
)
ctx, span := trace.StartSpan(ctx, "ProposerServer.ProposeBeaconBlock")
@@ -309,10 +309,11 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
}
rob, err := blocks.NewROBlockWithRoot(block, root)
if block.IsBlinded() {
block, blobSidecars, err = vs.handleBlindedBlock(ctx, block)
} else if block.Version() >= version.Deneb {
blobSidecars, dataColumnSidecars, err = vs.handleUnblindedBlock(block, req)
blobSidecars, dataColumnSidecars, err = vs.handleUnblindedBlock(rob, req)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "%s: %v", "handle block failed", err)
@@ -348,10 +349,10 @@ func (vs *Server) broadcastAndReceiveSidecars(
block interfaces.SignedBeaconBlock,
root [fieldparams.RootLength]byte,
blobSidecars []*ethpb.BlobSidecar,
dataColumnSideCars []*ethpb.DataColumnSidecar,
dataColumnSidecars []blocks.RODataColumn,
) error {
if block.Version() >= version.Fulu {
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSideCars, root); err != nil {
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSidecars, root); err != nil {
return errors.Wrap(err, "broadcast and receive data columns")
}
return nil
@@ -398,21 +399,28 @@ func (vs *Server) handleBlindedBlock(ctx context.Context, block interfaces.Signe
}
func (vs *Server) handleUnblindedBlock(
block interfaces.SignedBeaconBlock,
block blocks.ROBlock,
req *ethpb.GenericSignedBeaconBlock,
) ([]*ethpb.BlobSidecar, []*ethpb.DataColumnSidecar, error) {
) ([]*ethpb.BlobSidecar, []blocks.RODataColumn, error) {
rawBlobs, proofs, err := blobsAndProofs(req)
if err != nil {
return nil, nil, err
}
if block.Version() >= version.Fulu {
dataColumnSideCars, err := peerdas.ConstructDataColumnSidecars(block, rawBlobs, proofs)
// Compute cells and proofs from the blobs and cell proofs.
cellsAndProofs, err := peerdas.ComputeCellsAndProofs(rawBlobs, proofs)
if err != nil {
return nil, nil, errors.Wrap(err, "construct data column sidecars")
return nil, nil, errors.Wrap(err, "compute cells and proofs")
}
return nil, dataColumnSideCars, nil
// Construct data column sidecars from the signed block and cells and proofs.
roDataColumnSidecars, err := peerdas.ConstructDataColumnSidecar(cellsAndProofs, peerdas.PopulateFromBlock(block))
if err != nil {
return nil, nil, errors.Wrap(err, "data column sidcars")
}
return nil, roDataColumnSidecars, nil
}
blobSidecars, err := BuildBlobSidecars(block, rawBlobs, proofs)
@@ -468,26 +476,21 @@ func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethp
// broadcastAndReceiveDataColumns handles the broadcasting and reception of data columns sidecars.
func (vs *Server) broadcastAndReceiveDataColumns(
ctx context.Context,
sidecars []*ethpb.DataColumnSidecar,
roSidecars []blocks.RODataColumn,
root [fieldparams.RootLength]byte,
) error {
verifiedRODataColumns := make([]blocks.VerifiedRODataColumn, 0, len(sidecars))
verifiedRODataColumns := make([]blocks.VerifiedRODataColumn, 0, len(roSidecars))
eg, _ := errgroup.WithContext(ctx)
for _, sidecar := range sidecars {
roDataColumn, err := blocks.NewRODataColumnWithRoot(sidecar, root)
if err != nil {
return errors.Wrap(err, "new read-only data column with root")
}
for _, roSidecar := range roSidecars {
// We build this block ourselves, so we can upgrade the read only data column sidecar into a verified one.
verifiedRODataColumn := blocks.NewVerifiedRODataColumn(roDataColumn)
verifiedRODataColumn := blocks.NewVerifiedRODataColumn(roSidecar)
verifiedRODataColumns = append(verifiedRODataColumns, verifiedRODataColumn)
eg.Go(func() error {
// Compute the subnet index based on the column index.
subnet := peerdas.ComputeSubnetForDataColumnSidecar(sidecar.Index)
subnet := peerdas.ComputeSubnetForDataColumnSidecar(roSidecar.Index)
if err := vs.P2P.BroadcastDataColumnSidecar(root, subnet, sidecar); err != nil {
if err := vs.P2P.BroadcastDataColumnSidecar(subnet, verifiedRODataColumn); err != nil {
return errors.Wrap(err, "broadcast data column")
}

View File

@@ -10,7 +10,7 @@ import (
)
// BuildBlobSidecars given a block, builds the blob sidecars for the block.
func BuildBlobSidecars(blk interfaces.SignedBeaconBlock, blobs [][]byte, kzgProofs [][]byte) ([]*ethpb.BlobSidecar, error) {
func BuildBlobSidecars(blk interfaces.ReadOnlySignedBeaconBlock, blobs [][]byte, kzgProofs [][]byte) ([]*ethpb.BlobSidecar, error) {
if blk.Version() < version.Deneb {
return nil, nil // No blobs before deneb.
}

View File

@@ -155,6 +155,7 @@ go_library(
"@com_github_trailofbits_go_mutexasserts//:go_default_library",
"@io_opentelemetry_go_otel_trace//:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
"@org_golang_x_sync//singleflight:go_default_library",
],
)

View File

@@ -25,15 +25,14 @@ const (
// all data column sidecars. Then, it saves missing sidecars to the store.
// After a delay, it broadcasts in the background not seen via gossip
// (but reconstructed) sidecars.
func (s *Service) reconstructSaveBroadcastDataColumnSidecars(
ctx context.Context,
slot primitives.Slot,
proposerIndex primitives.ValidatorIndex,
root [fieldparams.RootLength]byte,
) error {
func (s *Service) reconstructSaveBroadcastDataColumnSidecars(ctx context.Context, sidecar blocks.VerifiedRODataColumn) error {
startTime := time.Now()
samplesPerSlot := params.BeaconConfig().SamplesPerSlot
root := sidecar.BlockRoot()
slot := sidecar.Slot()
proposerIndex := sidecar.ProposerIndex()
// Lock to prevent concurrent reconstructions.
s.reconstructionLock.Lock()
defer s.reconstructionLock.Unlock()
@@ -198,7 +197,7 @@ func (s *Service) broadcastMissingDataColumnSidecars(
subnet := peerdas.ComputeSubnetForDataColumnSidecar(verifiedRODataColumn.Index)
// Broadcast the missing data column.
if err := s.cfg.p2p.BroadcastDataColumnSidecar(root, subnet, verifiedRODataColumn.DataColumnSidecar); err != nil {
if err := s.cfg.p2p.BroadcastDataColumnSidecar(subnet, verifiedRODataColumn); err != nil {
log.WithError(err).Error("Broadcast data column")
}
@@ -220,3 +219,203 @@ func (s *Service) broadcastMissingDataColumnSidecars(
return nil
}
// processDataColumnSidecarsFromExecutionFromBlock retrieves (if available) data column sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processDataColumnSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) error {
const delay = 250 * time.Millisecond
secondsPerHalfSlot := time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second
numberOfColumns := params.BeaconConfig().NumberOfColumns
root := roBlock.Root()
block := roBlock.Block()
slot := block.Slot()
proposerIndex := block.ProposerIndex()
commitments, err := block.Body().BlobKzgCommitments()
if err != nil {
return errors.Wrap(err, "blob kzg commitments")
}
ctx, cancel := context.WithTimeout(ctx, secondsPerHalfSlot)
defer cancel()
for {
// Check if some data column sidecars to custody are missing.
missingIndices, err := s.missingDataColumnSidecars(root, commitments)
if err != nil {
return errors.Wrap(err, "missing data column sidecars")
}
// Return early if all needed data column sidecars are already available in storage.
if len(missingIndices) == 0 {
return nil
}
// Try to reconstruct data column sidecars from the execution client.
sidecars, err := s.cfg.executionReconstructor.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
if err != nil {
return errors.Wrap(err, "reconstruct data column sidecars")
}
// No sidecars are retrieved from the EL, retry later
sidecarCount := uint64(len(sidecars))
if sidecarCount == 0 {
if ctx.Err() != nil {
return ctx.Err()
}
time.Sleep(delay)
continue
}
// Boundary check.
if sidecarCount != numberOfColumns {
return errors.Errorf("reconstruct data column sidecars returned %d sidecars, expected %d - should never happen", sidecarCount, numberOfColumns)
}
// Broadcast and save data column sidecars to custody but not yet received.
for index := range missingIndices {
if index >= sidecarCount {
return errors.Errorf("data column index %d >= sidecar count %d - should never happen", index, sidecarCount)
}
// This sidecar has been received in the meantime, skip it.
if s.hasSeenDataColumnIndex(slot, proposerIndex, index) {
continue
}
sidecar := sidecars[index]
if err := s.cfg.p2p.BroadcastDataColumnSidecar(sidecar.Index, sidecar); err != nil {
return errors.Wrap(err, "broadcast data column sidecar")
}
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
return errors.Wrap(err, "receive data column sidecar")
}
}
return nil
}
}
func (s *Service) processDataColumnSidecarsFromExecutionFromColumnSidecar(ctx context.Context, sidecar blocks.VerifiedRODataColumn) error {
const delay = 250 * time.Millisecond
secondsPerHalfSlot := time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second
numberOfColumns := params.BeaconConfig().NumberOfColumns
commitments := sidecar.KzgCommitments
root := sidecar.BlockRoot()
ctx, cancel := context.WithTimeout(ctx, secondsPerHalfSlot)
defer cancel()
for {
// Check if some data column sidecars to custody are missing.
missingIndices, err := s.missingDataColumnSidecars(root, commitments)
if err != nil {
return errors.Wrap(err, "missing data column sidecars")
}
// Return early if all needed data column sidecars are already available in storage.
if len(missingIndices) == 0 {
return nil
}
// Try to reconstruct data column sidecars from the execution client.
sidecars, err := s.cfg.executionReconstructor.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromSidecar(sidecar.RODataColumn))
if err != nil {
return errors.Wrap(err, "reconstruct data column sidecars")
}
// No sidecars are retrieved from the EL, retry later
sidecarCount := uint64(len(sidecars))
if sidecarCount == 0 {
if ctx.Err() != nil {
return ctx.Err()
}
time.Sleep(delay)
continue
}
// Boundary check.
if sidecarCount != numberOfColumns {
return errors.Errorf("reconstruct data column sidecars returned %d sidecars, expected %d - should never happen", sidecarCount, numberOfColumns)
}
blockSlot, proposerIndex := sidecar.Slot(), sidecar.ProposerIndex()
// Broadcast and save data column sidecars to custody but not yet received.
for index := range missingIndices {
log := log.WithField("columnIndex", index)
if index >= sidecarCount {
return errors.Errorf("data column index %d >= sidecar count %d - should never happen", index, sidecarCount)
}
// This sidecar has been received in the meantime, skip it.
if s.hasSeenDataColumnIndex(blockSlot, proposerIndex, index) {
continue
}
sidecar := sidecars[index]
if err := s.cfg.p2p.BroadcastDataColumnSidecar(sidecar.Index, sidecar); err != nil {
log.WithError(err).Error("Failed to broadcast data column")
}
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
log.WithError(err).Error("Failed to receive data column")
}
}
return nil
}
}
// missingDataColumnSidecars returns the data column indices we should custody and that are missing in our storage
// for the given read only beacon block.
func (s *Service) missingDataColumnSidecars(root [fieldparams.RootLength]byte, commitments [][]byte) (map[uint64]bool, error) {
// Return early if there is not commitments.
if len(commitments) == 0 {
return nil, nil
}
// Retrieve our node ID.
nodeID := s.cfg.p2p.NodeID()
// Get the custody group sampling size for the node.
custodyGroupCount, err := s.cfg.p2p.CustodyGroupCount()
if err != nil {
return nil, errors.Wrap(err, "custody group count")
}
// Compute the sampling size.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#custody-sampling
samplesPerSlot := params.BeaconConfig().SamplesPerSlot
samplingSize := max(samplesPerSlot, custodyGroupCount)
// Get the peer info for the node.
peerInfo, _, err := peerdas.Info(nodeID, samplingSize)
if err != nil {
return nil, errors.Wrap(err, "peer info")
}
// Get the indices of the data column sidecars we have in the store.
storedIndices := s.cfg.dataColumnStorage.Summary(root).Stored()
// List indices we should custody and that are missing in our storage.
missingIndices := make(map[uint64]bool, samplingSize)
for index := range peerInfo.CustodyColumns {
if !storedIndices[index] {
missingIndices[index] = true
}
}
return missingIndices, nil
}

View File

@@ -4,10 +4,14 @@ import (
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
chainMock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
mockExecution "github.com/OffchainLabs/prysm/v6/beacon-chain/execution/testing"
mockp2p "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/testing/require"
@@ -27,9 +31,6 @@ func TestReconstructDataColumns(t *testing.T) {
roBlock, _, verifiedRoDataColumns := util.GenerateTestFuluBlockWithSidecars(t, blobCount)
require.Equal(t, numberOfColumns, uint64(len(verifiedRoDataColumns)))
root, block := roBlock.Root(), roBlock.Block()
slot, proposerIndex := block.Slot(), block.ProposerIndex()
minimumCount := peerdas.MinimumColumnCountToReconstruct()
t.Run("not enough stored sidecars", func(t *testing.T) {
@@ -38,7 +39,7 @@ func TestReconstructDataColumns(t *testing.T) {
require.NoError(t, err)
service := NewService(ctx, WithP2P(p2ptest.NewTestP2P(t)), WithDataColumnStorage(storage))
err = service.reconstructSaveBroadcastDataColumnSidecars(ctx, slot, proposerIndex, root)
err = service.reconstructSaveBroadcastDataColumnSidecars(ctx, verifiedRoDataColumns[0])
require.NoError(t, err)
})
@@ -48,7 +49,7 @@ func TestReconstructDataColumns(t *testing.T) {
require.NoError(t, err)
service := NewService(ctx, WithP2P(p2ptest.NewTestP2P(t)), WithDataColumnStorage(storage))
err = service.reconstructSaveBroadcastDataColumnSidecars(ctx, slot, proposerIndex, root)
err = service.reconstructSaveBroadcastDataColumnSidecars(ctx, verifiedRoDataColumns[0])
require.NoError(t, err)
})
@@ -72,7 +73,7 @@ func TestReconstructDataColumns(t *testing.T) {
WithChainService(&mockChain.ChainService{}),
)
err = service.reconstructSaveBroadcastDataColumnSidecars(ctx, slot, proposerIndex, root)
err = service.reconstructSaveBroadcastDataColumnSidecars(ctx, verifiedRoDataColumns[0])
require.NoError(t, err)
expected := make(map[uint64]bool, minimumCount+cgc)
@@ -85,7 +86,7 @@ func TestReconstructDataColumns(t *testing.T) {
expected[i] = true
}
summary := storage.Summary(root)
summary := storage.Summary(roBlock.Root())
actual := summary.Stored()
require.Equal(t, len(expected), len(actual))
@@ -175,3 +176,111 @@ func TestBroadcastMissingDataColumnSidecars(t *testing.T) {
require.Equal(t, true, p2p.BroadcastCalled.Load())
})
}
func TestMissingDataColumnSidecars(t *testing.T) {
ctx := t.Context()
// Start the trusted setup.
err := kzg.Start()
require.NoError(t, err)
t.Run("no commitments", func(t *testing.T) {
service := NewService(ctx, WithP2P(p2ptest.NewTestP2P(t)))
root := [fieldparams.RootLength]byte{0x01, 0x02, 0x03} // Some test root
commitments := [][]byte{}
missing, err := service.missingDataColumnSidecars(root, commitments)
require.NoError(t, err)
require.Equal(t, 0, len(missing))
})
t.Run("some sidecars missing", func(t *testing.T) {
const (
blobCount = 2
cgc = 8 // custody group count
)
// Generate test data
roBlock, _, verifiedRoDataColumns := util.GenerateTestFuluBlockWithSidecars(t, blobCount)
root := roBlock.Root()
// Create commitments from the block
commitments, err := roBlock.Block().Body().BlobKzgCommitments()
require.NoError(t, err)
// Setup storage with only some of the sidecars
storage := filesystem.NewEphemeralDataColumnStorage(t)
p2p := p2ptest.NewTestP2P(t)
service := NewService(ctx, WithP2P(p2p), WithDataColumnStorage(storage))
// Update custody info to set custody group count
_, _, err = service.cfg.p2p.UpdateCustodyInfo(0, cgc)
require.NoError(t, err)
// Save only some of the sidecars that the node should custody
// The node should custody indices: [1, 17, 19, 42, 75, 87, 102, 117]
// Save only indices 1, 42, and 102
storedIndices := []uint64{1, 42, 102}
toSave := make([]blocks.VerifiedRODataColumn, 0, len(storedIndices))
for _, index := range storedIndices {
toSave = append(toSave, verifiedRoDataColumns[index])
}
err = storage.Save(toSave)
require.NoError(t, err)
// Test function
missing, err := service.missingDataColumnSidecars(root, commitments)
require.NoError(t, err)
// Should be missing indices: 17, 19, 75, 87, 117
expectedMissing := map[uint64]bool{17: true, 19: true, 75: true, 87: true, 117: true}
require.Equal(t, len(expectedMissing), len(missing))
for index := range expectedMissing {
require.Equal(t, true, missing[index], "Index %d should be missing", index)
}
// Should NOT be missing stored indices
for _, storedIndex := range storedIndices {
require.Equal(t, false, missing[storedIndex], "Index %d should not be missing", storedIndex)
}
})
}
func TestProcessDataColumnSidecarsFromExecutionFromColumnSidecar(t *testing.T) {
const (
earliestAvailableSlot = 0
custodyGroupCount = 8
blobCount = 2
)
// Start the trusted setup.
err := kzg.Start()
require.NoError(t, err)
_, _, verifiedRoSidecars := util.GenerateTestFuluBlockWithSidecars(t, blobCount)
p2p := mockp2p.NewTestP2P(t)
_, _, err = p2p.UpdateCustodyInfo(0, custodyGroupCount)
require.NoError(t, err)
executionReconstructor := &mockExecution.EngineClient{
DataColumnSidecars: verifiedRoSidecars,
}
chain := new(chainMock.ChainService)
service := Service{
cfg: &config{
p2p: p2p,
dataColumnStorage: filesystem.NewEphemeralDataColumnStorage(t),
executionReconstructor: executionReconstructor,
chain: chain,
operationNotifier: new(chainMock.MockOperationNotifier),
},
seenDataColumnCache: newSlotAwareCache(1),
}
err = service.processDataColumnSidecarsFromExecutionFromColumnSidecar(t.Context(), verifiedRoSidecars[0])
require.NoError(t, err)
require.Equal(t, custodyGroupCount, len(chain.DataColumns))
}

View File

@@ -50,6 +50,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/trailofbits/go-mutexasserts"
"golang.org/x/sync/singleflight"
)
var _ runtime.Service = (*Service)(nil)
@@ -169,6 +170,7 @@ type Service struct {
verifierWaiter *verification.InitializerWaiter
newBlobVerifier verification.NewBlobVerifier
newColumnsVerifier verification.NewDataColumnsVerifier
columnSidecarsExecSingleFlight singleflight.Group
availableBlocker coverage.AvailableBlocker
reconstructionLock sync.Mutex
reconstructionRandGen *rand.Rand

View File

@@ -7,7 +7,6 @@ import (
"path"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/transition/interop"
"github.com/OffchainLabs/prysm/v6/config/features"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
@@ -15,7 +14,7 @@ import (
"github.com/OffchainLabs/prysm/v6/io/file"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/sirupsen/logrus"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
)
@@ -37,7 +36,11 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
return err
}
go s.processSidecarsFromExecution(ctx, signed)
roBlock, err := blocks.NewROBlockWithRoot(signed, root)
if err != nil {
return errors.Wrap(err, "new roblock with root")
}
go s.processSidecarsFromExecutionFromBlock(ctx, roBlock)
if err := s.cfg.chain.ReceiveBlock(ctx, signed, root, nil); err != nil {
if blockchain.IsInvalidBlock(err) {
@@ -61,11 +64,23 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
return err
}
// processSidecarsFromExecution retrieves (if available) sidecars data from the execution client,
// processSidecarsFromExecutionFromBlock retrieves (if available) sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processSidecarsFromExecution(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) {
func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, block blocks.ROBlock) {
if block.Version() >= version.Fulu {
s.processDataColumnSidecarsFromExecution(ctx, block)
key := fmt.Sprintf("%#x", block.Root())
if _, err, _ := s.columnSidecarsExecSingleFlight.Do(key, func() (interface{}, error) {
if err := s.processDataColumnSidecarsFromExecutionFromBlock(ctx, block); err != nil {
return nil, err
}
return nil, nil
}); err != nil {
log.WithError(err).Error("Failed to process data column sidecars from execution")
return
}
return
}
@@ -75,92 +90,6 @@ func (s *Service) processSidecarsFromExecution(ctx context.Context, block interf
}
}
// processDataColumnSidecarsFromExecution retrieves (if available) data column sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, roSignedBlock interfaces.ReadOnlySignedBeaconBlock) {
block := roSignedBlock.Block()
log := log.WithFields(logrus.Fields{
"slot": block.Slot(),
"proposerIndex": block.ProposerIndex(),
})
kzgCommitments, err := block.Body().BlobKzgCommitments()
if err != nil {
log.WithError(err).Error("Failed to read commitments from block")
return
}
if len(kzgCommitments) == 0 {
// No blobs to reconstruct.
return
}
blockRoot, err := block.HashTreeRoot()
if err != nil {
log.WithError(err).Error("Failed to calculate block root")
return
}
log = log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot))
if s.cfg.dataColumnStorage == nil {
log.Warning("Data column storage is not enabled, skip saving data column, but continue to reconstruct and broadcast data column")
}
// When this function is called, it's from the time when the block is received, so in almost all situations we need to get the data column from EL instead of the blob storage.
sidecars, err := s.cfg.executionReconstructor.ReconstructDataColumnSidecars(ctx, roSignedBlock, blockRoot)
if err != nil {
log.WithError(err).Debug("Cannot reconstruct data column sidecars after receiving the block")
return
}
// Return early if no blobs are retrieved from the EL.
if len(sidecars) == 0 {
return
}
nodeID := s.cfg.p2p.NodeID()
custodyGroupCount, err := s.cfg.p2p.CustodyGroupCount()
if err != nil {
log.WithError(err).Error("Failed to get custody group count")
return
}
info, _, err := peerdas.Info(nodeID, custodyGroupCount)
if err != nil {
log.WithError(err).Error("Failed to get peer info")
return
}
blockSlot := block.Slot()
proposerIndex := block.ProposerIndex()
// Broadcast and save data column sidecars to custody but not yet received.
sidecarCount := uint64(len(sidecars))
for columnIndex := range info.CustodyColumns {
log := log.WithField("columnIndex", columnIndex)
if columnIndex >= sidecarCount {
log.Error("Column custody index out of range - should never happen")
continue
}
if s.hasSeenDataColumnIndex(blockSlot, proposerIndex, columnIndex) {
continue
}
sidecar := sidecars[columnIndex]
if err := s.cfg.p2p.BroadcastDataColumnSidecar(blockRoot, sidecar.Index, sidecar.DataColumnSidecar); err != nil {
log.WithError(err).Error("Failed to broadcast data column")
}
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
log.WithError(err).Error("Failed to receive data column")
}
}
}
// processBlobSidecarsFromExecution retrieves (if available) blob sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processBlobSidecarsFromExecution(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) {

View File

@@ -135,7 +135,7 @@ func TestService_BeaconBlockSubscribe_UndefinedEeError(t *testing.T) {
require.Equal(t, 1, len(s.seenBlockCache.Keys()))
}
func TestReconstructAndBroadcastBlobs(t *testing.T) {
func TestProcessSidecarsFromExecutionFromBlock(t *testing.T) {
t.Run("blobs", func(t *testing.T) {
rob, err := blocks.NewROBlob(
&ethpb.BlobSidecar{
@@ -192,7 +192,9 @@ func TestReconstructAndBroadcastBlobs(t *testing.T) {
},
seenBlobCache: lruwrpr.New(1),
}
s.processSidecarsFromExecution(context.Background(), sb)
rob, err := blocks.NewROBlock(sb)
require.NoError(t, err)
s.processSidecarsFromExecutionFromBlock(context.Background(), rob)
require.Equal(t, tt.expectedBlobCount, len(chainService.Blobs))
})
}
@@ -253,7 +255,7 @@ func TestReconstructAndBroadcastBlobs(t *testing.T) {
name: "Constructed 128 data columns with all blobs",
blobCount: 1,
dataColumnSidecars: allColumns,
expectedDataColumnCount: 4, // default is 4
expectedDataColumnCount: 8,
},
}
@@ -261,10 +263,10 @@ func TestReconstructAndBroadcastBlobs(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
s := Service{
cfg: &config{
p2p: mockp2p.NewTestP2P(t),
chain: chainService,
clock: startup.NewClock(time.Now(), [32]byte{}),
blobStorage: filesystem.NewEphemeralBlobStorage(t),
p2p: mockp2p.NewTestP2P(t),
chain: chainService,
clock: startup.NewClock(time.Now(), [32]byte{}),
dataColumnStorage: filesystem.NewEphemeralDataColumnStorage(t),
executionReconstructor: &mockExecution.EngineClient{
DataColumnSidecars: tt.dataColumnSidecars,
},
@@ -288,10 +290,11 @@ func TestReconstructAndBroadcastBlobs(t *testing.T) {
sb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
s.processSidecarsFromExecution(context.Background(), sb)
rob, err := blocks.NewROBlock(sb)
require.NoError(t, err)
s.processSidecarsFromExecutionFromBlock(t.Context(), rob)
require.Equal(t, tt.expectedDataColumnCount, len(chainService.DataColumns))
})
}
})
}

View File

@@ -21,14 +21,21 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
return errors.Wrap(err, "receive data column sidecar")
}
slot := sidecar.Slot()
proposerIndex := sidecar.ProposerIndex()
root := sidecar.BlockRoot()
if err := s.reconstructSaveBroadcastDataColumnSidecars(ctx, slot, proposerIndex, root); err != nil {
if err := s.reconstructSaveBroadcastDataColumnSidecars(ctx, sidecar); err != nil {
return errors.Wrap(err, "reconstruct/save/broadcast data column sidecars")
}
key := fmt.Sprintf("%#x", sidecar.BlockRoot())
if _, err, _ := s.columnSidecarsExecSingleFlight.Do(key, func() (interface{}, error) {
if err := s.processDataColumnSidecarsFromExecutionFromColumnSidecar(ctx, sidecar); err != nil {
return nil, err
}
return nil, nil
}); err != nil {
return errors.Wrap(err, "process data column sidecars from execution from sidecar")
}
return nil
}

View File

@@ -30,17 +30,10 @@ func GenerateTestDataColumns(t *testing.T, parent [fieldparams.RootLength]byte,
}
cellsAndProofs := util.GenerateCellsAndProofs(t, blobs)
dataColumnSidecars, err := peerdas.DataColumnSidecars(roBlock, cellsAndProofs)
roDataColumnSidecars, err := peerdas.ConstructDataColumnSidecar(cellsAndProofs, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
roDataColumns := make([]blocks.RODataColumn, 0, len(dataColumnSidecars))
for i := range dataColumnSidecars {
roDataColumn, err := blocks.NewRODataColumn(dataColumnSidecars[i])
require.NoError(t, err)
roDataColumns = append(roDataColumns, roDataColumn)
}
return roDataColumns
return roDataColumnSidecars
}
func TestColumnSatisfyRequirement(t *testing.T) {

View File

@@ -0,0 +1,3 @@
### Added
- Add retry logic when GetBlobsV2 is called.
- Call GetBlobsV2 as soon as we receive the first data column sidecar or block

View File

@@ -2076,8 +2076,8 @@
- name: get_data_column_sidecars
sources:
- file: beacon-chain/core/peerdas/das_core.go
search: func DataColumnSidecars(
- file: beacon-chain/core/peerdas/validator.go
search: func dataColumnSidecars(
spec: |
<spec fn="get_data_column_sidecars" fork="fulu" hash="317fc596">
def get_data_column_sidecars(
@@ -2115,8 +2115,8 @@
- name: get_data_column_sidecars_from_block
sources:
- file: beacon-chain/core/peerdas/das_core.go
search: func dataColumnsSidecars(
- file: beacon-chain/core/peerdas/validator.go
search: func DataColumnSidecarsFromBlock(
spec: |
<spec fn="get_data_column_sidecars_from_block" fork="fulu" hash="02ffae23">
def get_data_column_sidecars_from_block(
@@ -2144,7 +2144,9 @@
</spec>
- name: get_data_column_sidecars_from_column_sidecar
sources: []
sources:
- file: beacon-chain/core/peerdas/validator.go
search: func DataColumnSidecarsFromColumnSidecar(
spec: |
<spec fn="get_data_column_sidecars_from_column_sidecar" fork="fulu" hash="4304cdec">
def get_data_column_sidecars_from_column_sidecar(

View File

@@ -159,15 +159,13 @@ func GenerateTestFuluBlockWithSidecars(t *testing.T, blobCount int, options ...F
cellsAndProofs := GenerateCellsAndProofs(t, blobs)
sidecars, err := peerdas.DataColumnSidecars(signedBeaconBlock, cellsAndProofs)
rob, err := blocks.NewROBlockWithRoot(signedBeaconBlock, root)
require.NoError(t, err)
roSidecars, err := peerdas.ConstructDataColumnSidecar(cellsAndProofs, peerdas.PopulateFromBlock(rob))
require.NoError(t, err)
roSidecars := make([]blocks.RODataColumn, 0, len(sidecars))
verifiedRoSidecars := make([]blocks.VerifiedRODataColumn, 0, len(sidecars))
for _, sidecar := range sidecars {
roSidecar, err := blocks.NewRODataColumnWithRoot(sidecar, root)
require.NoError(t, err)
verifiedRoSidecars := make([]blocks.VerifiedRODataColumn, 0, len(roSidecars))
for _, roSidecar := range roSidecars {
roVerifiedSidecar := blocks.NewVerifiedRODataColumn(roSidecar)
roSidecars = append(roSidecars, roSidecar)