Compare commits

...

83 Commits

Author SHA1 Message Date
Manu NALEPA
81e2053cc5 Implement and use broadcastAndReceiveUnseenDataColumnSidecars. 2025-09-17 10:39:02 +02:00
Manu NALEPA
212d2f63e4 processDataColumnSidecarsFromExecution: Use receiveDataColumnSidecars instead of receiveDataColumnSidecar. 2025-09-17 10:24:34 +02:00
Manu NALEPA
2472864436 processDataColumnSidecarsFromReconstruction: Guard against a single flight.
In `dataColumnSubscriber`, trig in parallel `processDataColumnSidecarsFromReconstruction` and `processDataColumnSidecarsFromExecution`.

Stop when the first of them is successful.
2025-09-17 10:09:40 +02:00
Manu NALEPA
0b971a19f8 processDataColumnSidecarsFromExecution: Use single flight directly in the function.
So the caller does not have any more the responsability to deal with multiple simultaneous calls.
2025-09-17 09:37:29 +02:00
Manu NALEPA
d881866c6a dataColumnSubscriber: Add godoc and remove only once used variable. 2025-09-16 22:31:09 +02:00
Manu NALEPA
eb23992090 Implement receiveDataColumnSidecars and transform receiveDataColumnSidecar as a subcase of the plural version. 2025-09-16 22:30:44 +02:00
Manu NALEPA
ed7516e998 DataColumnStorage.Save: Remove wrong godoc. 2025-09-16 22:29:39 +02:00
Manu NALEPA
e4bd3b24fa Rename custodyColumns ==> columnIndicesToSample. 2025-09-16 14:17:54 +02:00
Manu NALEPA
ff196bffcc reconstructSaveBroadcastDataColumnSidecars: Use the s.columnIndicesToSample instead of recoding its content.
ddd
2025-09-16 14:17:38 +02:00
Manu NALEPA
97ee3e952c Merge branch 'develop' into peerDAS-getBlobsV2 2025-09-15 22:18:28 +02:00
Manu NALEPA
bcf1997723 processDataColumnSidecarsFromExecution: Stop using DB cache. 2025-09-15 22:16:45 +02:00
Manu NALEPA
dbf9d746ad fetchCellsAndProofsFromExecution: Avoid useless flattening. 2025-09-15 17:21:43 +02:00
Manu NALEPA
75de2e606a Move functions to be consistent with blobs. 2025-09-15 14:29:24 +02:00
Manu NALEPA
2c689e721b Fix tests 2025-09-15 14:21:54 +02:00
Manu NALEPA
6b1cf4fb5a ConstructionPopulator: Add tests. 2025-09-15 13:50:25 +02:00
Manu NALEPA
17096ea934 Remove useless tests. 2025-09-15 12:10:27 +02:00
Manu NALEPA
7f4e72071e ReceiveBlock: Wrap errors. 2025-09-15 12:03:23 +02:00
Manu NALEPA
b3f3c1a289 Merge branch 'develop' into peerDAS-getBlobsV2 2025-09-15 11:56:45 +02:00
Manu NALEPA
2bfa21dbc0 Implement a unique processDataColumnSidecarsFromExecution instead 2 separate functions from block and from sidecar. 2025-09-15 11:53:48 +02:00
Manu NALEPA
8d49bc913f Replace mutating public method (but only internally used) Populate but private not mutating method extract. 2025-09-15 11:16:02 +02:00
Manu NALEPA
be5066637e - Add godocs
- Rename some functions to be closer to the spec
- Add err in return of commitments
2025-09-15 10:43:43 +02:00
Kasey Kirkham
9aeb89cedd refactor to deduplicate sidecar construction code 2025-09-12 23:14:27 -05:00
Manu NALEPA
81d413ca73 Merge branch 'develop' into peerDAS-getBlobsV2 2025-09-11 16:04:34 +02:00
satushh
a55d61de9c remove redundant flag option 2025-08-29 19:02:51 +01:00
satushh
4faeac57e1 fix bad merge 2025-08-29 18:58:00 +01:00
satushh
16bed0a93f correct bad merge 2025-08-29 18:45:27 +01:00
satushh
0e9d5f4513 tidy up IsDataAvailable 2025-08-29 18:30:15 +01:00
satushh
65d6be0406 put samplesPerSlot at appropriate place 2025-08-29 18:23:34 +01:00
satushh
115e20de62 fix more tests 2025-08-29 18:23:34 +01:00
satushh
fb4028f736 fix test 2025-08-29 18:23:34 +01:00
satushh
af7470e835 remove redundant function and fix name 2025-08-29 18:23:34 +01:00
satushh
618fe1d1b2 blockchain: get variable samplesPerSlot only when required 2025-08-29 18:23:34 +01:00
satushh
71fe640eef execution: use service context instead of function's for retry 2025-08-29 18:23:34 +01:00
satushh
350fead19b execution: edge case - delete activeRetries on success 2025-08-29 18:23:34 +01:00
satushh
23645e549a sync: new appropriate mock service 2025-08-29 18:23:34 +01:00
satushh
2abe1adb29 sync: fix lint, test and add extra test for when data is actually not available 2025-08-29 18:23:34 +01:00
satushh
c9f3e111e6 lint: formatting and remove confusing comment 2025-08-29 18:23:34 +01:00
satushh
f8410908e9 blockchain: cleaner DA check 2025-08-29 18:23:34 +01:00
satushh
4c328977d9 execution: make reconstructSingleflight part of the service struct 2025-08-29 18:23:34 +01:00
satushh
b98a875194 blockchain: move IsDataAvailable interface to blockchain package 2025-08-29 18:23:34 +01:00
satushh
ee3a7b980a sync: don't call ReconstructDataColumnSidecars if not required 2025-08-29 18:23:34 +01:00
satushh
62dfe6bde5 execution: ensure single responsibility, execution should not do DA check 2025-08-29 18:23:34 +01:00
satushh
3a671abaa0 execution: ensure the retry actually happens when it needs to 2025-08-29 18:23:34 +01:00
satushh
ae6ca74c24 lint: format 2025-08-29 18:23:34 +01:00
satushh
bdd423e100 execution: retry logic inside ReconstructDataColumnSidecars itself 2025-08-29 18:23:34 +01:00
satushh
b9e31f258b lint: lint and use unused metrics 2025-08-29 18:23:34 +01:00
satushh
7c1eef5780 lint: formatting 2025-08-29 18:23:34 +01:00
satushh
88620a1a63 blockchain: fix CustodyGroupCount return 2025-08-29 18:23:34 +01:00
satushh
3c9a0b9511 bazel: bazel run //:gazelle -- fix 2025-08-29 18:23:34 +01:00
satushh
a8c5cfd871 sync: remove unwanted tests 2025-08-29 18:23:34 +01:00
satushh
d3b2122542 da: updated IsDataAvailable 2025-08-29 18:23:34 +01:00
satushh
a12b00c71c execution: retry atomicity test 2025-08-29 18:23:34 +01:00
satushh
358b9acc53 execution: fix test 2025-08-29 18:23:34 +01:00
satushh
aa31a16a9c sync: remove unwanted checks 2025-08-29 18:23:34 +01:00
satushh
86bbb6ee3b da: non blocking checks 2025-08-29 18:23:34 +01:00
satushh
139c2b887c exec: hardcode retry interval 2025-08-29 18:23:34 +01:00
satushh
88b03089d2 sync: no goroutine, getblobsv2 in absence of block as well, wrap error 2025-08-29 18:23:34 +01:00
satushh
bce0960422 engine: remove isDataAlreadyAvailable function 2025-08-29 18:23:34 +01:00
satushh
58f196c38c reconstruct: simplify multi goroutine case and avoid race condition 2025-08-29 18:23:34 +01:00
satushh
de63e24815 reconstruct: load once, correctly deliver the result to all waiting goroutines 2025-08-29 18:23:34 +01:00
satushh
c851987e49 beacon: default retry interval 2025-08-29 18:23:34 +01:00
satushh
141f9adff3 lint: remove unused field 2025-08-29 18:23:34 +01:00
satushh
47bf2bd985 sidecar: recover function and different context for retrying 2025-08-29 18:23:34 +01:00
satushh
fb28a17fd5 config: make retry interval configurable 2025-08-29 18:23:34 +01:00
satushh
338d4c29d2 lint: return error when it is not nil 2025-08-29 18:23:34 +01:00
satushh
35f6d1277d lint: fmt and log capitalisation 2025-08-29 18:23:34 +01:00
satushh
77b3d23c86 test: engine client and sync package, metrics 2025-08-29 18:23:34 +01:00
satushh
32e81a33dd getBlobsV2: retry if reconstruction isnt successful 2025-08-29 18:23:34 +01:00
Manu NALEPA
54ce47a839 Add DataColumnStorage and SubscribeAllDataSubnets flag. 2025-08-29 18:23:34 +01:00
Manu NALEPA
208febf884 Fix Potuz's comment. 2025-08-29 18:23:34 +01:00
Manu NALEPA
b3c9cc2459 selectPeers: Avoid map with key but empty value. 2025-08-29 18:23:34 +01:00
Manu NALEPA
d8895ec49c Revert "Fix James' comment."
This reverts commit a3f919205a.
2025-08-29 18:23:34 +01:00
Manu NALEPA
5e593d50eb Revert "Fix Potuz's comment."
This reverts commit c45230b455.
2025-08-29 18:23:34 +01:00
Manu NALEPA
4878468047 Fix flakiness in TestSelectPeers. 2025-08-29 18:23:34 +01:00
Manu NALEPA
e10045cf8c Fix James' comment. 2025-08-29 18:23:34 +01:00
Manu NALEPA
b18c4b99c7 Implement TestSelectPeers. 2025-08-29 18:23:34 +01:00
Manu NALEPA
272a95934f Implement TestFetchDataColumnSidecarsFromPeers. 2025-08-29 18:23:34 +01:00
Manu NALEPA
e4582ed1e2 Fix Potuz's comment. 2025-08-29 18:23:34 +01:00
Manu NALEPA
f2cc2e7128 Fix Potuz's comment. 2025-08-29 18:23:34 +01:00
Manu NALEPA
84b1bc7635 Fix Potuz's comment. 2025-08-29 18:23:34 +01:00
Manu NALEPA
c51a8f5965 Fix Potuz's comment. 2025-08-29 18:23:34 +01:00
Manu NALEPA
fdb858473e Fix Potuz's comment. 2025-08-29 18:23:34 +01:00
Manu NALEPA
e2356bb7ad PeerDAS: Implement sync 2025-08-29 18:23:34 +01:00
43 changed files with 1385 additions and 1004 deletions

View File

@@ -14,7 +14,10 @@ const BytesPerBlob = ckzg4844.BytesPerBlob
type Blob [BytesPerBlob]byte
// BytesPerCell is the number of bytes in a single cell.
const BytesPerCell = ckzg4844.BytesPerCell
const (
BytesPerCell = ckzg4844.BytesPerCell
BytesPerProof = ckzg4844.BytesPerProof
)
// Cell represents a chunk of an encoded Blob.
type Cell [BytesPerCell]byte
@@ -23,7 +26,7 @@ type Cell [BytesPerCell]byte
type Commitment [48]byte
// Proof represents a KZG proof that attests to the validity of a Blob or parts of it.
type Proof [48]byte
type Proof [BytesPerProof]byte
// Bytes48 is a 48-byte array.
type Bytes48 = ckzg4844.Bytes48

View File

@@ -664,14 +664,14 @@ func missingDataColumnIndices(store *filesystem.DataColumnStorage, root [fieldpa
// closed, the context hits cancellation/timeout, or notifications have been received for all the missing sidecars.
func (s *Service) isDataAvailable(
ctx context.Context,
root [fieldparams.RootLength]byte,
signedBlock interfaces.ReadOnlySignedBeaconBlock,
roBlock consensusblocks.ROBlock,
) error {
block := signedBlock.Block()
block := roBlock.Block()
if block == nil {
return errors.New("invalid nil beacon block")
}
root := roBlock.Root()
blockVersion := block.Version()
if blockVersion >= version.Fulu {
return s.areDataColumnsAvailable(ctx, root, block)
@@ -691,8 +691,6 @@ func (s *Service) areDataColumnsAvailable(
root [fieldparams.RootLength]byte,
block interfaces.ReadOnlyBeaconBlock,
) error {
samplesPerSlot := params.BeaconConfig().SamplesPerSlot
// We are only required to check within MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS
blockSlot, currentSlot := block.Slot(), s.CurrentSlot()
blockEpoch, currentEpoch := slots.ToEpoch(blockSlot), slots.ToEpoch(currentSlot)
@@ -726,6 +724,7 @@ func (s *Service) areDataColumnsAvailable(
// Compute the sampling size.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#custody-sampling
samplesPerSlot := params.BeaconConfig().SamplesPerSlot
samplingSize := max(samplesPerSlot, custodyGroupCount)
// Get the peer info for the node.

View File

@@ -2952,14 +2952,18 @@ func TestIsDataAvailable(t *testing.T) {
params := testIsAvailableParams{options: []Option{WithGenesisTime(time.Unix(0, 0))}}
ctx, _, service, root, signed := testIsAvailableSetup(t, params)
err := service.isDataAvailable(ctx, root, signed)
roBlock, err := consensusblocks.NewROBlockWithRoot(signed, root)
require.NoError(t, err)
err = service.isDataAvailable(ctx, roBlock)
require.NoError(t, err)
})
t.Run("Fulu - no commitment in blocks", func(t *testing.T) {
ctx, _, service, root, signed := testIsAvailableSetup(t, testIsAvailableParams{})
err := service.isDataAvailable(ctx, root, signed)
roBlock, err := consensusblocks.NewROBlockWithRoot(signed, root)
require.NoError(t, err)
err = service.isDataAvailable(ctx, roBlock)
require.NoError(t, err)
})
@@ -2977,7 +2981,9 @@ func TestIsDataAvailable(t *testing.T) {
ctx, _, service, root, signed := testIsAvailableSetup(t, params)
err := service.isDataAvailable(ctx, root, signed)
roBlock, err := consensusblocks.NewROBlockWithRoot(signed, root)
require.NoError(t, err)
err = service.isDataAvailable(ctx, roBlock)
require.NoError(t, err)
})
@@ -2989,7 +2995,9 @@ func TestIsDataAvailable(t *testing.T) {
ctx, _, service, root, signed := testIsAvailableSetup(t, params)
err := service.isDataAvailable(ctx, root, signed)
roBlock, err := consensusblocks.NewROBlockWithRoot(signed, root)
require.NoError(t, err)
err = service.isDataAvailable(ctx, roBlock)
require.NoError(t, err)
})
@@ -3037,7 +3045,9 @@ func TestIsDataAvailable(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second*2)
defer cancel()
err = service.isDataAvailable(ctx, root, signed)
roBlock, err := consensusblocks.NewROBlockWithRoot(signed, root)
require.NoError(t, err)
err = service.isDataAvailable(ctx, roBlock)
require.NoError(t, err)
})
@@ -3099,7 +3109,9 @@ func TestIsDataAvailable(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second*2)
defer cancel()
err = service.isDataAvailable(ctx, root, signed)
roBlock, err := consensusblocks.NewROBlockWithRoot(signed, root)
require.NoError(t, err)
err = service.isDataAvailable(ctx, roBlock)
require.NoError(t, err)
})
@@ -3118,7 +3130,9 @@ func TestIsDataAvailable(t *testing.T) {
cancel()
}()
err := service.isDataAvailable(ctx, root, signed)
roBlock, err := consensusblocks.NewROBlockWithRoot(signed, root)
require.NoError(t, err)
err = service.isDataAvailable(ctx, roBlock)
require.NotNil(t, err)
})
}

View File

@@ -16,7 +16,6 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/slasher/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/state"
"github.com/OffchainLabs/prysm/v6/config/features"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
@@ -93,7 +92,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
blockCopy, err := block.Copy()
if err != nil {
return err
return errors.Wrap(err, "block copy")
}
preState, err := s.getBlockPreState(ctx, blockCopy.Block())
@@ -104,17 +103,17 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
currentCheckpoints := s.saveCurrentCheckpoints(preState)
roblock, err := blocks.NewROBlockWithRoot(blockCopy, blockRoot)
if err != nil {
return err
return errors.Wrap(err, "new ro block with root")
}
postState, isValidPayload, err := s.validateExecutionAndConsensus(ctx, preState, roblock)
if err != nil {
return err
return errors.Wrap(err, "validator execution and consensus")
}
daWaitedTime, err := s.handleDA(ctx, blockCopy, blockRoot, avs)
daWaitedTime, err := s.handleDA(ctx, avs, roblock)
if err != nil {
return err
return errors.Wrap(err, "handle da")
}
// Defragment the state before continuing block processing.
@@ -135,10 +134,10 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
if err := s.postBlockProcess(args); err != nil {
err := errors.Wrap(err, "could not process block")
tracing.AnnotateError(span, err)
return err
return errors.Wrap(err, "post block process")
}
if err := s.updateCheckpoints(ctx, currentCheckpoints, preState, postState, blockRoot); err != nil {
return err
return errors.Wrap(err, "update checkpoints")
}
// If slasher is configured, forward the attestations in the block via an event feed for processing.
if s.slasherEnabled {
@@ -152,12 +151,12 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
// Have we been finalizing? Should we start saving hot states to db?
if err := s.checkSaveHotStateDB(ctx); err != nil {
return err
return errors.Wrap(err, "check save hot state db")
}
// We apply the same heuristic to some of our more important caches.
if err := s.handleCaches(); err != nil {
return err
return errors.Wrap(err, "handle caches")
}
s.reportPostBlockProcessing(blockCopy, blockRoot, receivedTime, daWaitedTime)
return nil
@@ -240,37 +239,19 @@ func (s *Service) validateExecutionAndConsensus(
return postState, isValidPayload, nil
}
func (s *Service) handleDA(
ctx context.Context,
block interfaces.SignedBeaconBlock,
blockRoot [fieldparams.RootLength]byte,
avs das.AvailabilityStore,
) (elapsed time.Duration, err error) {
defer func(start time.Time) {
elapsed = time.Since(start)
if err == nil {
dataAvailWaitedTime.Observe(float64(elapsed.Milliseconds()))
}
}(time.Now())
if avs == nil {
if err = s.isDataAvailable(ctx, blockRoot, block); err != nil {
return
}
return
func (s *Service) handleDA(ctx context.Context, avs das.AvailabilityStore, block blocks.ROBlock) (time.Duration, error) {
var err error
start := time.Now()
if avs != nil {
err = avs.IsDataAvailable(ctx, s.CurrentSlot(), block)
} else {
err = s.isDataAvailable(ctx, block)
}
var rob blocks.ROBlock
rob, err = blocks.NewROBlockWithRoot(block, blockRoot)
elapsed := time.Since(start)
if err != nil {
return
dataAvailWaitedTime.Observe(float64(elapsed.Milliseconds()))
}
err = avs.IsDataAvailable(ctx, s.CurrentSlot(), rob)
return
return elapsed, err
}
func (s *Service) reportPostBlockProcessing(

View File

@@ -192,7 +192,9 @@ func TestHandleDA(t *testing.T) {
require.NoError(t, err)
s, _ := minimalTestService(t)
elapsed, err := s.handleDA(t.Context(), signedBeaconBlock, [fieldparams.RootLength]byte{}, nil)
block, err := blocks.NewROBlockWithRoot(signedBeaconBlock, [32]byte{})
require.NoError(t, err)
elapsed, err := s.handleDA(t.Context(), nil, block)
require.NoError(t, err)
require.Equal(t, true, elapsed > 0, "Elapsed time should be greater than 0")
}

View File

@@ -24,8 +24,8 @@ import (
p2pTesting "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v6/beacon-chain/state/stategen"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
@@ -89,7 +89,7 @@ func (mb *mockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
return nil
}
func (mb *mockBroadcaster) BroadcastDataColumnSidecar(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar) error {
func (mb *mockBroadcaster) BroadcastDataColumnSidecar(_ uint64, _ blocks.VerifiedRODataColumn) error {
mb.broadcastCalled = true
return nil
}

View File

@@ -19,11 +19,11 @@ go_library(
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//container/trie:go_default_library",
"//crypto/hash:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/version:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",

View File

@@ -4,15 +4,10 @@ import (
"encoding/binary"
"math"
"slices"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/crypto/hash"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/holiman/uint256"
"github.com/pkg/errors"
@@ -20,12 +15,9 @@ import (
var (
// Custom errors
ErrCustodyGroupTooLarge = errors.New("custody group too large")
ErrCustodyGroupCountTooLarge = errors.New("custody group count too large")
ErrSizeMismatch = errors.New("mismatch in the number of blob KZG commitments and cellsAndProofs")
ErrNotEnoughDataColumnSidecars = errors.New("not enough columns")
ErrDataColumnSidecarsNotSortedByIndex = errors.New("data column sidecars are not sorted by index")
errWrongComputedCustodyGroupCount = errors.New("wrong computed custody group count, should never happen")
ErrCustodyGroupTooLarge = errors.New("custody group too large")
ErrCustodyGroupCountTooLarge = errors.New("custody group count too large")
errWrongComputedCustodyGroupCount = errors.New("wrong computed custody group count, should never happen")
// maxUint256 is the maximum value of an uint256.
maxUint256 = &uint256.Int{math.MaxUint64, math.MaxUint64, math.MaxUint64, math.MaxUint64}
@@ -117,44 +109,6 @@ func ComputeColumnsForCustodyGroup(custodyGroup uint64) ([]uint64, error) {
return columns, nil
}
// DataColumnSidecars computes the data column sidecars from the signed block, cells and cell proofs.
// The returned value contains pointers to function parameters.
// (If the caller alterates `cellsAndProofs` afterwards, the returned value will be modified as well.)
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#get_data_column_sidecars_from_block
func DataColumnSidecars(signedBlock interfaces.ReadOnlySignedBeaconBlock, cellsAndProofs []kzg.CellsAndProofs) ([]*ethpb.DataColumnSidecar, error) {
if signedBlock == nil || signedBlock.IsNil() || len(cellsAndProofs) == 0 {
return nil, nil
}
block := signedBlock.Block()
blockBody := block.Body()
blobKzgCommitments, err := blockBody.BlobKzgCommitments()
if err != nil {
return nil, errors.Wrap(err, "blob KZG commitments")
}
if len(blobKzgCommitments) != len(cellsAndProofs) {
return nil, ErrSizeMismatch
}
signedBlockHeader, err := signedBlock.Header()
if err != nil {
return nil, errors.Wrap(err, "signed block header")
}
kzgCommitmentsInclusionProof, err := blocks.MerkleProofKZGCommitments(blockBody)
if err != nil {
return nil, errors.Wrap(err, "merkle proof KZG commitments")
}
dataColumnSidecars, err := dataColumnsSidecars(signedBlockHeader, blobKzgCommitments, kzgCommitmentsInclusionProof, cellsAndProofs)
if err != nil {
return nil, errors.Wrap(err, "data column sidecars")
}
return dataColumnSidecars, nil
}
// ComputeCustodyGroupForColumn computes the custody group for a given column.
// It is the reciprocal function of ComputeColumnsForCustodyGroup.
func ComputeCustodyGroupForColumn(columnIndex uint64) (uint64, error) {
@@ -194,72 +148,3 @@ func CustodyColumns(custodyGroups []uint64) (map[uint64]bool, error) {
return columns, nil
}
// dataColumnsSidecars computes the data column sidecars from the signed block header, the blob KZG commiments,
// the KZG commitment includion proofs and cells and cell proofs.
// The returned value contains pointers to function parameters.
// (If the caller alterates input parameters afterwards, the returned value will be modified as well.)
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#get_data_column_sidecars
func dataColumnsSidecars(
signedBlockHeader *ethpb.SignedBeaconBlockHeader,
blobKzgCommitments [][]byte,
kzgCommitmentsInclusionProof [][]byte,
cellsAndProofs []kzg.CellsAndProofs,
) ([]*ethpb.DataColumnSidecar, error) {
start := time.Now()
if len(blobKzgCommitments) != len(cellsAndProofs) {
return nil, ErrSizeMismatch
}
numberOfColumns := params.BeaconConfig().NumberOfColumns
blobsCount := len(cellsAndProofs)
sidecars := make([]*ethpb.DataColumnSidecar, 0, numberOfColumns)
for columnIndex := range numberOfColumns {
column := make([]kzg.Cell, 0, blobsCount)
kzgProofOfColumn := make([]kzg.Proof, 0, blobsCount)
for rowIndex := range blobsCount {
cellsForRow := cellsAndProofs[rowIndex].Cells
proofsForRow := cellsAndProofs[rowIndex].Proofs
// Validate that we have enough cells and proofs for this column index
if columnIndex >= uint64(len(cellsForRow)) {
return nil, errors.Errorf("column index %d exceeds cells length %d for blob %d", columnIndex, len(cellsForRow), rowIndex)
}
if columnIndex >= uint64(len(proofsForRow)) {
return nil, errors.Errorf("column index %d exceeds proofs length %d for blob %d", columnIndex, len(proofsForRow), rowIndex)
}
cell := cellsForRow[columnIndex]
column = append(column, cell)
kzgProof := proofsForRow[columnIndex]
kzgProofOfColumn = append(kzgProofOfColumn, kzgProof)
}
columnBytes := make([][]byte, 0, blobsCount)
for i := range column {
columnBytes = append(columnBytes, column[i][:])
}
kzgProofOfColumnBytes := make([][]byte, 0, blobsCount)
for _, kzgProof := range kzgProofOfColumn {
kzgProofOfColumnBytes = append(kzgProofOfColumnBytes, kzgProof[:])
}
sidecar := &ethpb.DataColumnSidecar{
Index: columnIndex,
Column: columnBytes,
KzgCommitments: blobKzgCommitments,
KzgProofs: kzgProofOfColumnBytes,
SignedBlockHeader: signedBlockHeader,
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
}
sidecars = append(sidecars, sidecar)
}
dataColumnComputationTime.Observe(float64(time.Since(start).Milliseconds()))
return sidecars, nil
}

View File

@@ -3,13 +3,9 @@ package peerdas_test
import (
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/ethereum/go-ethereum/p2p/enode"
)
@@ -31,93 +27,6 @@ func TestComputeColumnsForCustodyGroup(t *testing.T) {
require.ErrorIs(t, err, peerdas.ErrCustodyGroupTooLarge)
}
func TestDataColumnSidecars(t *testing.T) {
t.Run("nil signed block", func(t *testing.T) {
var expected []*ethpb.DataColumnSidecar = nil
actual, err := peerdas.DataColumnSidecars(nil, []kzg.CellsAndProofs{})
require.NoError(t, err)
require.DeepSSZEqual(t, expected, actual)
})
t.Run("empty cells and proofs", func(t *testing.T) {
// Create a protobuf signed beacon block.
signedBeaconBlockPb := util.NewBeaconBlockDeneb()
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
actual, err := peerdas.DataColumnSidecars(signedBeaconBlock, []kzg.CellsAndProofs{})
require.NoError(t, err)
require.IsNil(t, actual)
})
t.Run("sizes mismatch", func(t *testing.T) {
// Create a protobuf signed beacon block.
signedBeaconBlockPb := util.NewBeaconBlockDeneb()
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
// Create cells and proofs.
cellsAndProofs := make([]kzg.CellsAndProofs, 1)
_, err = peerdas.DataColumnSidecars(signedBeaconBlock, cellsAndProofs)
require.ErrorIs(t, err, peerdas.ErrSizeMismatch)
})
t.Run("cells array too short for column index", func(t *testing.T) {
// Create a Fulu block with a blob commitment.
signedBeaconBlockPb := util.NewBeaconBlockFulu()
signedBeaconBlockPb.Block.Body.BlobKzgCommitments = [][]byte{make([]byte, 48)}
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
// Create cells and proofs with insufficient cells for the number of columns.
// This simulates a scenario where cellsAndProofs has fewer cells than expected columns.
cellsAndProofs := []kzg.CellsAndProofs{
{
Cells: make([]kzg.Cell, 10), // Only 10 cells
Proofs: make([]kzg.Proof, 10), // Only 10 proofs
},
}
// This should fail because the function will try to access columns up to NumberOfColumns
// but we only have 10 cells/proofs.
_, err = peerdas.DataColumnSidecars(signedBeaconBlock, cellsAndProofs)
require.ErrorContains(t, "column index", err)
require.ErrorContains(t, "exceeds cells length", err)
})
t.Run("proofs array too short for column index", func(t *testing.T) {
// Create a Fulu block with a blob commitment.
signedBeaconBlockPb := util.NewBeaconBlockFulu()
signedBeaconBlockPb.Block.Body.BlobKzgCommitments = [][]byte{make([]byte, 48)}
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
// Create cells and proofs with sufficient cells but insufficient proofs.
numberOfColumns := params.BeaconConfig().NumberOfColumns
cellsAndProofs := []kzg.CellsAndProofs{
{
Cells: make([]kzg.Cell, numberOfColumns),
Proofs: make([]kzg.Proof, 5), // Only 5 proofs, less than columns
},
}
// This should fail when trying to access proof beyond index 4.
_, err = peerdas.DataColumnSidecars(signedBeaconBlock, cellsAndProofs)
require.ErrorContains(t, "column index", err)
require.ErrorContains(t, "exceeds proofs length", err)
})
}
func TestComputeCustodyGroupForColumn(t *testing.T) {
params.SetupTestConfigCleanup(t)
config := params.BeaconConfig()

View File

@@ -63,17 +63,14 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
t.Run("invalid proof", func(t *testing.T) {
sidecars := generateRandomSidecars(t, seed, blobCount)
sidecars[0].Column[0][0]++ // It is OK to overflow
roDataColumnSidecars := generateRODataColumnSidecars(t, sidecars)
err := peerdas.VerifyDataColumnsSidecarKZGProofs(roDataColumnSidecars)
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
require.ErrorIs(t, err, peerdas.ErrInvalidKZGProof)
})
t.Run("nominal", func(t *testing.T) {
sidecars := generateRandomSidecars(t, seed, blobCount)
roDataColumnSidecars := generateRODataColumnSidecars(t, sidecars)
err := peerdas.VerifyDataColumnsSidecarKZGProofs(roDataColumnSidecars)
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
require.NoError(t, err)
})
}
@@ -256,9 +253,8 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_SameCommitments_NoBatch(b *testin
for i := range int64(b.N) {
// Generate new random sidecars to ensure the KZG backend does not cache anything.
sidecars := generateRandomSidecars(b, i, blobCount)
roDataColumnSidecars := generateRODataColumnSidecars(b, sidecars)
for _, sidecar := range roDataColumnSidecars {
for _, sidecar := range sidecars {
sidecars := []blocks.RODataColumn{sidecar}
b.StartTimer()
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
@@ -282,7 +278,7 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_DiffCommitments_Batch(b *testing.
b.ResetTimer()
for j := range int64(b.N) {
allSidecars := make([]*ethpb.DataColumnSidecar, 0, numberOfColumns)
allSidecars := make([]blocks.RODataColumn, 0, numberOfColumns)
for k := int64(0); k < numberOfColumns; k += columnsCount {
// Use different seeds to generate different blobs/commitments
seed := int64(b.N*i) + numberOfColumns*j + blobCount*k
@@ -292,10 +288,8 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_DiffCommitments_Batch(b *testing.
allSidecars = append(allSidecars, sidecars[k:k+columnsCount]...)
}
roDataColumnSidecars := generateRODataColumnSidecars(b, allSidecars)
b.StartTimer()
err := peerdas.VerifyDataColumnsSidecarKZGProofs(roDataColumnSidecars)
err := peerdas.VerifyDataColumnsSidecarKZGProofs(allSidecars)
b.StopTimer()
require.NoError(b, err)
}
@@ -323,8 +317,7 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_DiffCommitments_Batch4(b *testing
for j := range int64(batchCount) {
// Use different seeds to generate different blobs/commitments
sidecars := generateRandomSidecars(b, int64(batchCount)*i+j*blobCount, blobCount)
roDataColumnSidecars := generateRODataColumnSidecars(b, sidecars[:columnsCount])
allSidecars = append(allSidecars, roDataColumnSidecars)
allSidecars = append(allSidecars, sidecars)
}
for _, sidecars := range allSidecars {
@@ -358,7 +351,7 @@ func createTestSidecar(t *testing.T, index uint64, column, kzgCommitments, kzgPr
return roSidecar
}
func generateRandomSidecars(t testing.TB, seed, blobCount int64) []*ethpb.DataColumnSidecar {
func generateRandomSidecars(t testing.TB, seed, blobCount int64) []blocks.RODataColumn {
dbBlock := util.NewBeaconBlockDeneb()
commitments := make([][]byte, 0, blobCount)
@@ -379,20 +372,10 @@ func generateRandomSidecars(t testing.TB, seed, blobCount int64) []*ethpb.DataCo
require.NoError(t, err)
cellsAndProofs := util.GenerateCellsAndProofs(t, blobs)
sidecars, err := peerdas.DataColumnSidecars(sBlock, cellsAndProofs)
rob, err := blocks.NewROBlock(sBlock)
require.NoError(t, err)
sidecars, err := peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(rob))
require.NoError(t, err)
return sidecars
}
func generateRODataColumnSidecars(t testing.TB, sidecars []*ethpb.DataColumnSidecar) []blocks.RODataColumn {
roDataColumnSidecars := make([]blocks.RODataColumn, 0, len(sidecars))
for _, sidecar := range sidecars {
roCol, err := blocks.NewRODataColumn(sidecar)
require.NoError(t, err)
roDataColumnSidecars = append(roDataColumnSidecars, roCol)
}
return roDataColumnSidecars
}

View File

@@ -5,7 +5,7 @@ import (
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
pb "github.com/OffchainLabs/prysm/v6/proto/engine/v1"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
@@ -16,6 +16,7 @@ var (
ErrBlobIndexTooHigh = errors.New("blob index is too high")
ErrBlockRootMismatch = errors.New("block root mismatch")
ErrBlobsCellsProofsMismatch = errors.New("blobs and cells proofs mismatch")
ErrNilBlobAndProof = errors.New("nil blob and proof")
)
// MinimumColumnCountToReconstruct return the minimum number of columns needed to proceed to a reconstruction.
@@ -62,12 +63,6 @@ func ReconstructDataColumnSidecars(verifiedRoSidecars []blocks.VerifiedRODataCol
return nil, ErrNotEnoughDataColumnSidecars
}
// Sidecars are verified and are committed to the same block.
// All signed block headers, KZG commitments, and inclusion proofs are the same.
signedBlockHeader := referenceSidecar.SignedBlockHeader
kzgCommitments := referenceSidecar.KzgCommitments
kzgCommitmentsInclusionProof := referenceSidecar.KzgCommitmentsInclusionProof
// Recover cells and compute proofs in parallel.
var wg errgroup.Group
cellsAndProofs := make([]kzg.CellsAndProofs, blobCount)
@@ -100,80 +95,22 @@ func ReconstructDataColumnSidecars(verifiedRoSidecars []blocks.VerifiedRODataCol
return nil, errors.Wrap(err, "wait for RecoverCellsAndKZGProofs")
}
reconstructedSidecars, err := dataColumnsSidecars(signedBlockHeader, kzgCommitments, kzgCommitmentsInclusionProof, cellsAndProofs)
outSidecars, err := DataColumnSidecars(cellsAndProofs, PopulateFromSidecar(referenceSidecar))
if err != nil {
return nil, errors.Wrap(err, "data column sidecars from items")
}
// Input sidecars are verified, and we reconstructed ourselves the missing sidecars.
// As a consequence, reconstructed sidecars are also verified.
reconstructedVerifiedRoSidecars := make([]blocks.VerifiedRODataColumn, 0, len(reconstructedSidecars))
for _, sidecar := range reconstructedSidecars {
roSidecar, err := blocks.NewRODataColumnWithRoot(sidecar, blockRoot)
if err != nil {
return nil, errors.Wrap(err, "new RO data column with root")
}
verifiedRoSidecar := blocks.NewVerifiedRODataColumn(roSidecar)
reconstructedVerifiedRoSidecars := make([]blocks.VerifiedRODataColumn, 0, len(outSidecars))
for _, sidecar := range outSidecars {
verifiedRoSidecar := blocks.NewVerifiedRODataColumn(sidecar)
reconstructedVerifiedRoSidecars = append(reconstructedVerifiedRoSidecars, verifiedRoSidecar)
}
return reconstructedVerifiedRoSidecars, nil
}
// ConstructDataColumnSidecars constructs data column sidecars from a block, (un-extended) blobs and
// cell proofs corresponding the extended blobs. The main purpose of this function is to
// construct data column sidecars from data obtained from the execution client via:
// - `engine_getBlobsV2` - https://github.com/ethereum/execution-apis/blob/main/src/engine/osaka.md#engine_getblobsv2, or
// - `engine_getPayloadV5` - https://github.com/ethereum/execution-apis/blob/main/src/engine/osaka.md#engine_getpayloadv5
// Note: In this function, to stick with the `BlobsBundleV2` format returned by the execution client in `engine_getPayloadV5`,
// cell proofs are "flattened".
func ConstructDataColumnSidecars(block interfaces.ReadOnlySignedBeaconBlock, blobs [][]byte, cellProofs [][]byte) ([]*ethpb.DataColumnSidecar, error) {
// Check if the cells count is equal to the cell proofs count.
numberOfColumns := params.BeaconConfig().NumberOfColumns
blobCount := uint64(len(blobs))
cellProofsCount := uint64(len(cellProofs))
cellsCount := blobCount * numberOfColumns
if cellsCount != cellProofsCount {
return nil, ErrBlobsCellsProofsMismatch
}
cellsAndProofs := make([]kzg.CellsAndProofs, 0, blobCount)
for i, blob := range blobs {
var kzgBlob kzg.Blob
if copy(kzgBlob[:], blob) != len(kzgBlob) {
return nil, errors.New("wrong blob size - should never happen")
}
// Compute the extended cells from the (non-extended) blob.
cells, err := kzg.ComputeCells(&kzgBlob)
if err != nil {
return nil, errors.Wrap(err, "compute cells")
}
var proofs []kzg.Proof
for idx := uint64(i) * numberOfColumns; idx < (uint64(i)+1)*numberOfColumns; idx++ {
var kzgProof kzg.Proof
if copy(kzgProof[:], cellProofs[idx]) != len(kzgProof) {
return nil, errors.New("wrong KZG proof size - should never happen")
}
proofs = append(proofs, kzgProof)
}
cellsProofs := kzg.CellsAndProofs{Cells: cells, Proofs: proofs}
cellsAndProofs = append(cellsAndProofs, cellsProofs)
}
dataColumnSidecars, err := DataColumnSidecars(block, cellsAndProofs)
if err != nil {
return nil, errors.Wrap(err, "data column sidcars")
}
return dataColumnSidecars, nil
}
// ReconstructBlobs constructs verified read only blobs sidecars from verified read only blob sidecars.
// The following constraints must be satisfied:
// - All `dataColumnSidecars` has to be committed to the same block, and
@@ -256,6 +193,89 @@ func ReconstructBlobs(block blocks.ROBlock, verifiedDataColumnSidecars []blocks.
return blobSidecars, nil
}
// ComputeCellsAndProofsFromFlat computes the cells and proofs from blobs and cell flat proofs.
func ComputeCellsAndProofsFromFlat(blobs [][]byte, cellProofs [][]byte) ([]kzg.CellsAndProofs, error) {
numberOfColumns := params.BeaconConfig().NumberOfColumns
blobCount := uint64(len(blobs))
cellProofsCount := uint64(len(cellProofs))
cellsCount := blobCount * numberOfColumns
if cellsCount != cellProofsCount {
return nil, ErrBlobsCellsProofsMismatch
}
cellsAndProofs := make([]kzg.CellsAndProofs, 0, blobCount)
for i, blob := range blobs {
var kzgBlob kzg.Blob
if copy(kzgBlob[:], blob) != len(kzgBlob) {
return nil, errors.New("wrong blob size - should never happen")
}
// Compute the extended cells from the (non-extended) blob.
cells, err := kzg.ComputeCells(&kzgBlob)
if err != nil {
return nil, errors.Wrap(err, "compute cells")
}
var proofs []kzg.Proof
for idx := uint64(i) * numberOfColumns; idx < (uint64(i)+1)*numberOfColumns; idx++ {
var kzgProof kzg.Proof
if copy(kzgProof[:], cellProofs[idx]) != len(kzgProof) {
return nil, errors.New("wrong KZG proof size - should never happen")
}
proofs = append(proofs, kzgProof)
}
cellsProofs := kzg.CellsAndProofs{Cells: cells, Proofs: proofs}
cellsAndProofs = append(cellsAndProofs, cellsProofs)
}
return cellsAndProofs, nil
}
// ComputeCellsAndProofs computes the cells and proofs from blobs and cell proofs.
func ComputeCellsAndProofsFromStructured(blobsAndProofs []*pb.BlobAndProofV2) ([]kzg.CellsAndProofs, error) {
numberOfColumns := params.BeaconConfig().NumberOfColumns
cellsAndProofs := make([]kzg.CellsAndProofs, 0, len(blobsAndProofs))
for _, blobAndProof := range blobsAndProofs {
if blobAndProof == nil {
return nil, ErrNilBlobAndProof
}
var kzgBlob kzg.Blob
if copy(kzgBlob[:], blobAndProof.Blob) != len(kzgBlob) {
return nil, errors.New("wrong blob size - should never happen")
}
// Compute the extended cells from the (non-extended) blob.
cells, err := kzg.ComputeCells(&kzgBlob)
if err != nil {
return nil, errors.Wrap(err, "compute cells")
}
kzgProofs := make([]kzg.Proof, 0, numberOfColumns*kzg.BytesPerProof)
for _, kzgProofBytes := range blobAndProof.KzgProofs {
if len(kzgProofBytes) != kzg.BytesPerProof {
return nil, errors.New("wrong KZG proof size - should never happen")
}
var kzgProof kzg.Proof
if copy(kzgProof[:], kzgProofBytes) != len(kzgProof) {
return nil, errors.New("wrong copied KZG proof size - should never happen")
}
kzgProofs = append(kzgProofs, kzgProof)
}
cellsProofs := kzg.CellsAndProofs{Cells: cells, Proofs: kzgProofs}
cellsAndProofs = append(cellsAndProofs, cellsProofs)
}
return cellsAndProofs, nil
}
// blobSidecarsFromDataColumnSidecars converts verified data column sidecars to verified blob sidecars.
func blobSidecarsFromDataColumnSidecars(roBlock blocks.ROBlock, dataColumnSidecars []blocks.VerifiedRODataColumn, indices []int) ([]*blocks.VerifiedROBlob, error) {
referenceSidecar := dataColumnSidecars[0]

View File

@@ -9,7 +9,7 @@ import (
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
pb "github.com/OffchainLabs/prysm/v6/proto/engine/v1"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/pkg/errors"
@@ -124,50 +124,6 @@ func TestReconstructDataColumnSidecars(t *testing.T) {
})
}
func TestConstructDataColumnSidecars(t *testing.T) {
const (
blobCount = 3
cellsPerBlob = fieldparams.CellsPerBlob
)
numberOfColumns := params.BeaconConfig().NumberOfColumns
// Start the trusted setup.
err := kzg.Start()
require.NoError(t, err)
roBlock, _, baseVerifiedRoSidecars := util.GenerateTestFuluBlockWithSidecars(t, blobCount)
// Extract blobs and proofs from the sidecars.
blobs := make([][]byte, 0, blobCount)
cellProofs := make([][]byte, 0, cellsPerBlob)
for blobIndex := range blobCount {
blob := make([]byte, 0, cellsPerBlob)
for columnIndex := range cellsPerBlob {
cell := baseVerifiedRoSidecars[columnIndex].Column[blobIndex]
blob = append(blob, cell...)
}
blobs = append(blobs, blob)
for columnIndex := range numberOfColumns {
cellProof := baseVerifiedRoSidecars[columnIndex].KzgProofs[blobIndex]
cellProofs = append(cellProofs, cellProof)
}
}
actual, err := peerdas.ConstructDataColumnSidecars(roBlock, blobs, cellProofs)
require.NoError(t, err)
// Extract the base verified ro sidecars into sidecars.
expected := make([]*ethpb.DataColumnSidecar, 0, len(baseVerifiedRoSidecars))
for _, verifiedRoSidecar := range baseVerifiedRoSidecars {
expected = append(expected, verifiedRoSidecar.DataColumnSidecar)
}
require.DeepSSZEqual(t, expected, actual)
}
func TestReconstructBlobs(t *testing.T) {
// Start the trusted setup.
err := kzg.Start()
@@ -250,7 +206,7 @@ func TestReconstructBlobs(t *testing.T) {
// Compute cells and proofs from blob sidecars.
var wg errgroup.Group
blobs := make([][]byte, blobCount)
cellsAndProofs := make([]kzg.CellsAndProofs, blobCount)
inputCellsAndProofs := make([]kzg.CellsAndProofs, blobCount)
for i := range blobCount {
blob := roBlobSidecars[i].Blob
blobs[i] = blob
@@ -267,7 +223,7 @@ func TestReconstructBlobs(t *testing.T) {
// It is safe for multiple goroutines to concurrently write to the same slice,
// as long as they are writing to different indices, which is the case here.
cellsAndProofs[i] = cp
inputCellsAndProofs[i] = cp
return nil
})
@@ -278,25 +234,24 @@ func TestReconstructBlobs(t *testing.T) {
// Flatten proofs.
cellProofs := make([][]byte, 0, blobCount*numberOfColumns)
for _, cp := range cellsAndProofs {
for _, cp := range inputCellsAndProofs {
for _, proof := range cp.Proofs {
cellProofs = append(cellProofs, proof[:])
}
}
// Construct data column sidecars.
// It is OK to use the public function `ConstructDataColumnSidecars`, as long as
// `TestConstructDataColumnSidecars` tests pass.
dataColumnSidecars, err := peerdas.ConstructDataColumnSidecars(roBlock, blobs, cellProofs)
// Compute celles and proofs from the blobs and cell proofs.
cellsAndProofs, err := peerdas.ComputeCellsAndProofsFromFlat(blobs, cellProofs)
require.NoError(t, err)
// Construct data column sidears from the signed block and cells and proofs.
roDataColumnSidecars, err := peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
// Convert to verified data column sidecars.
verifiedRoSidecars := make([]blocks.VerifiedRODataColumn, 0, len(dataColumnSidecars))
for _, dataColumnSidecar := range dataColumnSidecars {
roSidecar, err := blocks.NewRODataColumn(dataColumnSidecar)
require.NoError(t, err)
verifiedRoSidecar := blocks.NewVerifiedRODataColumn(roSidecar)
verifiedRoSidecars := make([]blocks.VerifiedRODataColumn, 0, len(roDataColumnSidecars))
for _, roDataColumnSidecar := range roDataColumnSidecars {
verifiedRoSidecar := blocks.NewVerifiedRODataColumn(roDataColumnSidecar)
verifiedRoSidecars = append(verifiedRoSidecars, verifiedRoSidecar)
}
@@ -339,3 +294,162 @@ func TestReconstructBlobs(t *testing.T) {
})
}
func TestComputeCellsAndProofsFromFlat(t *testing.T) {
// Start the trusted setup.
err := kzg.Start()
require.NoError(t, err)
t.Run("mismatched blob and proof counts", func(t *testing.T) {
numberOfColumns := params.BeaconConfig().NumberOfColumns
// Create one blob but proofs for two blobs
blobs := [][]byte{{}}
// Create proofs for 2 blobs worth of columns
cellProofs := make([][]byte, 2*numberOfColumns)
_, err := peerdas.ComputeCellsAndProofsFromFlat(blobs, cellProofs)
require.ErrorIs(t, err, peerdas.ErrBlobsCellsProofsMismatch)
})
t.Run("nominal", func(t *testing.T) {
const blobCount = 2
numberOfColumns := params.BeaconConfig().NumberOfColumns
// Generate test blobs
_, roBlobSidecars := util.GenerateTestElectraBlockWithSidecar(t, [fieldparams.RootLength]byte{}, 42, blobCount)
// Extract blobs and compute expected cells and proofs
blobs := make([][]byte, blobCount)
expectedCellsAndProofs := make([]kzg.CellsAndProofs, blobCount)
var wg errgroup.Group
for i := range blobCount {
blob := roBlobSidecars[i].Blob
blobs[i] = blob
wg.Go(func() error {
var kzgBlob kzg.Blob
count := copy(kzgBlob[:], blob)
require.Equal(t, len(kzgBlob), count)
cp, err := kzg.ComputeCellsAndKZGProofs(&kzgBlob)
if err != nil {
return errors.Wrapf(err, "compute cells and kzg proofs for blob %d", i)
}
expectedCellsAndProofs[i] = cp
return nil
})
}
err := wg.Wait()
require.NoError(t, err)
// Flatten proofs
cellProofs := make([][]byte, 0, blobCount*numberOfColumns)
for _, cp := range expectedCellsAndProofs {
for _, proof := range cp.Proofs {
cellProofs = append(cellProofs, proof[:])
}
}
// Test ComputeCellsAndProofs
actualCellsAndProofs, err := peerdas.ComputeCellsAndProofsFromFlat(blobs, cellProofs)
require.NoError(t, err)
require.Equal(t, blobCount, len(actualCellsAndProofs))
// Verify the results match expected
for i := range blobCount {
require.Equal(t, len(expectedCellsAndProofs[i].Cells), len(actualCellsAndProofs[i].Cells))
require.Equal(t, len(expectedCellsAndProofs[i].Proofs), len(actualCellsAndProofs[i].Proofs))
// Compare cells
for j, expectedCell := range expectedCellsAndProofs[i].Cells {
require.Equal(t, expectedCell, actualCellsAndProofs[i].Cells[j])
}
// Compare proofs
for j, expectedProof := range expectedCellsAndProofs[i].Proofs {
require.Equal(t, expectedProof, actualCellsAndProofs[i].Proofs[j])
}
}
})
}
func TestComputeCellsAndProofsFromStructured(t *testing.T) {
t.Run("nil blob and proof", func(t *testing.T) {
_, err := peerdas.ComputeCellsAndProofsFromStructured([]*pb.BlobAndProofV2{nil})
require.ErrorIs(t, err, peerdas.ErrNilBlobAndProof)
})
t.Run("nominal", func(t *testing.T) {
// Start the trusted setup.
err := kzg.Start()
require.NoError(t, err)
const blobCount = 2
// Generate test blobs
_, roBlobSidecars := util.GenerateTestElectraBlockWithSidecar(t, [fieldparams.RootLength]byte{}, 42, blobCount)
// Extract blobs and compute expected cells and proofs
blobsAndProofs := make([]*pb.BlobAndProofV2, blobCount)
expectedCellsAndProofs := make([]kzg.CellsAndProofs, blobCount)
var wg errgroup.Group
for i := range blobCount {
blob := roBlobSidecars[i].Blob
wg.Go(func() error {
var kzgBlob kzg.Blob
count := copy(kzgBlob[:], blob)
require.Equal(t, len(kzgBlob), count)
cellsAndProofs, err := kzg.ComputeCellsAndKZGProofs(&kzgBlob)
if err != nil {
return errors.Wrapf(err, "compute cells and kzg proofs for blob %d", i)
}
expectedCellsAndProofs[i] = cellsAndProofs
kzgProofs := make([][]byte, 0, len(cellsAndProofs.Proofs))
for _, proof := range cellsAndProofs.Proofs {
kzgProofs = append(kzgProofs, proof[:])
}
blobAndProof := &pb.BlobAndProofV2{
Blob: blob,
KzgProofs: kzgProofs,
}
blobsAndProofs[i] = blobAndProof
return nil
})
}
err = wg.Wait()
require.NoError(t, err)
// Test ComputeCellsAndProofs
actualCellsAndProofs, err := peerdas.ComputeCellsAndProofsFromStructured(blobsAndProofs)
require.NoError(t, err)
require.Equal(t, blobCount, len(actualCellsAndProofs))
// Verify the results match expected
for i := range blobCount {
require.Equal(t, len(expectedCellsAndProofs[i].Cells), len(actualCellsAndProofs[i].Cells))
require.Equal(t, len(expectedCellsAndProofs[i].Proofs), len(actualCellsAndProofs[i].Proofs))
// Compare cells
for j, expectedCell := range expectedCellsAndProofs[i].Cells {
require.Equal(t, expectedCell, actualCellsAndProofs[i].Cells[j])
}
// Compare proofs
for j, expectedProof := range expectedCellsAndProofs[i].Proofs {
require.Equal(t, expectedProof, actualCellsAndProofs[i].Proofs[j])
}
}
})
}

View File

@@ -1,12 +1,76 @@
package peerdas
import (
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
beaconState "github.com/OffchainLabs/prysm/v6/beacon-chain/state"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/pkg/errors"
)
var (
ErrNilSignedBlockOrEmptyCellsAndProofs = errors.New("nil signed block or empty cells and proofs")
ErrSizeMismatch = errors.New("mismatch in the number of blob KZG commitments and cellsAndProofs")
ErrNotEnoughDataColumnSidecars = errors.New("not enough columns")
ErrDataColumnSidecarsNotSortedByIndex = errors.New("data column sidecars are not sorted by index")
)
var (
_ ConstructionPopulator = (*BlockReconstructionSource)(nil)
_ ConstructionPopulator = (*SidecarReconstructionSource)(nil)
)
const (
BlockType = "BeaconBlock"
SidecarType = "DataColumnSidecar"
)
type (
// ConstructionPopulator is an interface that can be satisfied by a type that can use data from a struct
// like a DataColumnSidecar or a BeaconBlock to set the fields in a data column sidecar that cannot
// be obtained from the engine api.
ConstructionPopulator interface {
Slot() primitives.Slot
Root() [fieldparams.RootLength]byte
ProposerIndex() primitives.ValidatorIndex
Commitments() ([][]byte, error)
Type() string
extract() (*blockInfo, error)
}
// BlockReconstructionSource is a ConstructionPopulator that uses a beacon block as the source of data
BlockReconstructionSource struct {
blocks.ROBlock
}
// DataColumnSidecar is a ConstructionPopulator that uses a data column sidecar as the source of data
SidecarReconstructionSource struct {
blocks.VerifiedRODataColumn
}
blockInfo struct {
signedBlockHeader *ethpb.SignedBeaconBlockHeader
kzgCommitments [][]byte
kzgInclusionProof [][]byte
}
)
// PopulateFromBlock creates a BlockReconstructionSource from a beacon block
func PopulateFromBlock(block blocks.ROBlock) *BlockReconstructionSource {
return &BlockReconstructionSource{ROBlock: block}
}
// PopulateFromSidecar creates a SidecarReconstructionSource from a data column sidecar
func PopulateFromSidecar(sidecar blocks.VerifiedRODataColumn) *SidecarReconstructionSource {
return &SidecarReconstructionSource{VerifiedRODataColumn: sidecar}
}
// ValidatorsCustodyRequirement returns the number of custody groups regarding the validator indices attached to the beacon node.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#validator-custody
func ValidatorsCustodyRequirement(state beaconState.ReadOnlyBeaconState, validatorsIndex map[primitives.ValidatorIndex]bool) (uint64, error) {
@@ -28,3 +92,159 @@ func ValidatorsCustodyRequirement(state beaconState.ReadOnlyBeaconState, validat
count := totalNodeBalance / balancePerAdditionalCustodyGroup
return min(max(count, validatorCustodyRequirement), numberOfCustodyGroups), nil
}
// DataColumnSidecars, given ConstructionPopulator and the cells/proofs associated with each blob in the
// block, assembles sidecars which can be distributed to peers.
// This is an adapted version of
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#get_data_column_sidecars,
// which is designed to be used both when constructing sidecars from a block and from a sidecar, replacing
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#get_data_column_sidecars_from_block and
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#get_data_column_sidecars_from_column_sidecar
func DataColumnSidecars(rows []kzg.CellsAndProofs, src ConstructionPopulator) ([]blocks.RODataColumn, error) {
if len(rows) == 0 {
return nil, nil
}
start := time.Now()
cells, proofs, err := rotateRowsToCols(rows, params.BeaconConfig().NumberOfColumns)
if err != nil {
return nil, errors.Wrap(err, "rotate cells and proofs")
}
maxIdx := params.BeaconConfig().NumberOfColumns
roSidecars := make([]blocks.RODataColumn, 0, maxIdx)
for idx := range maxIdx {
info, err := src.extract()
if err != nil {
return nil, errors.Wrap(err, "extract block info")
}
sidecar := &ethpb.DataColumnSidecar{
Index: idx,
Column: cells[idx],
KzgCommitments: info.kzgCommitments,
KzgProofs: proofs[idx],
SignedBlockHeader: info.signedBlockHeader,
KzgCommitmentsInclusionProof: info.kzgInclusionProof,
}
if len(sidecar.KzgCommitments) != len(sidecar.Column) || len(sidecar.KzgCommitments) != len(sidecar.KzgProofs) {
return nil, ErrSizeMismatch
}
roSidecar, err := blocks.NewRODataColumnWithRoot(sidecar, src.Root())
if err != nil {
return nil, errors.Wrap(err, "new ro data column")
}
roSidecars = append(roSidecars, roSidecar)
}
dataColumnComputationTime.Observe(float64(time.Since(start).Milliseconds()))
return roSidecars, nil
}
// Slot returns the slot of the source
func (s *BlockReconstructionSource) Slot() primitives.Slot {
return s.Block().Slot()
}
// ProposerIndex returns the proposer index of the source
func (s *BlockReconstructionSource) ProposerIndex() primitives.ValidatorIndex {
return s.Block().ProposerIndex()
}
// Commitments returns the blob KZG commitments of the source
func (s *BlockReconstructionSource) Commitments() ([][]byte, error) {
c, err := s.Block().Body().BlobKzgCommitments()
if err != nil {
return nil, errors.Wrap(err, "blob KZG commitments")
}
return c, nil
}
// Type returns the type of the source
func (s *BlockReconstructionSource) Type() string {
return BlockType
}
// extract extracts the block information from the source
func (b *BlockReconstructionSource) extract() (*blockInfo, error) {
block := b.Block()
header, err := b.Header()
if err != nil {
return nil, errors.Wrap(err, "header")
}
commitments, err := block.Body().BlobKzgCommitments()
if err != nil {
return nil, errors.Wrap(err, "commitments")
}
inclusionProof, err := blocks.MerkleProofKZGCommitments(block.Body())
if err != nil {
return nil, errors.Wrap(err, "merkle proof kzg commitments")
}
info := &blockInfo{
signedBlockHeader: header,
kzgCommitments: commitments,
kzgInclusionProof: inclusionProof,
}
return info, nil
}
// rotateRowsToCols takes a 2D slice of cells and proofs, where the x is rows (blobs) and y is columns,
// and returns a 2D slice where x is columns and y is rows.
func rotateRowsToCols(rows []kzg.CellsAndProofs, numCols uint64) ([][][]byte, [][][]byte, error) {
if len(rows) == 0 {
return nil, nil, nil
}
cellCols := make([][][]byte, numCols)
proofCols := make([][][]byte, numCols)
for i, cp := range rows {
if uint64(len(cp.Cells)) != numCols {
return nil, nil, errors.Wrap(ErrNotEnoughDataColumnSidecars, "not enough cells")
}
if len(cp.Cells) != len(cp.Proofs) {
return nil, nil, errors.Wrap(ErrNotEnoughDataColumnSidecars, "not enough proofs")
}
for j := uint64(0); j < numCols; j++ {
if i == 0 {
cellCols[j] = make([][]byte, len(rows))
proofCols[j] = make([][]byte, len(rows))
}
cellCols[j][i] = cp.Cells[j][:]
proofCols[j][i] = cp.Proofs[j][:]
}
}
return cellCols, proofCols, nil
}
// Root returns the block root of the source
func (s *SidecarReconstructionSource) Root() [fieldparams.RootLength]byte {
return s.BlockRoot()
}
// Commmitments returns the blob KZG commitments of the source
func (s *SidecarReconstructionSource) Commitments() ([][]byte, error) {
return s.KzgCommitments, nil
}
// Type returns the type of the source
func (s *SidecarReconstructionSource) Type() string {
return SidecarType
}
// extract extracts the block information from the source
func (s *SidecarReconstructionSource) extract() (*blockInfo, error) {
info := &blockInfo{
signedBlockHeader: s.SignedBlockHeader,
kzgCommitments: s.KzgCommitments,
kzgInclusionProof: s.KzgCommitmentsInclusionProof,
}
return info, nil
}

View File

@@ -3,11 +3,15 @@ package peerdas_test
import (
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
state_native "github.com/OffchainLabs/prysm/v6/beacon-chain/state/state-native"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
)
func TestValidatorsCustodyRequirement(t *testing.T) {
@@ -53,3 +57,218 @@ func TestValidatorsCustodyRequirement(t *testing.T) {
})
}
}
func TestDataColumnSidecars(t *testing.T) {
t.Run("sizes mismatch", func(t *testing.T) {
// Create a protobuf signed beacon block.
signedBeaconBlockPb := util.NewBeaconBlockDeneb()
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
// Create cells and proofs.
cellsAndProofs := []kzg.CellsAndProofs{
{
Cells: make([]kzg.Cell, params.BeaconConfig().NumberOfColumns),
Proofs: make([]kzg.Proof, params.BeaconConfig().NumberOfColumns),
},
}
rob, err := blocks.NewROBlock(signedBeaconBlock)
require.NoError(t, err)
_, err = peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(rob))
require.ErrorIs(t, err, peerdas.ErrSizeMismatch)
})
t.Run("cells array too short for column index", func(t *testing.T) {
// Create a Fulu block with a blob commitment.
signedBeaconBlockPb := util.NewBeaconBlockFulu()
signedBeaconBlockPb.Block.Body.BlobKzgCommitments = [][]byte{make([]byte, 48)}
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
// Create cells and proofs with insufficient cells for the number of columns.
// This simulates a scenario where cellsAndProofs has fewer cells than expected columns.
cellsAndProofs := []kzg.CellsAndProofs{
{
Cells: make([]kzg.Cell, 10), // Only 10 cells
Proofs: make([]kzg.Proof, 10), // Only 10 proofs
},
}
// This should fail because the function will try to access columns up to NumberOfColumns
// but we only have 10 cells/proofs.
rob, err := blocks.NewROBlock(signedBeaconBlock)
require.NoError(t, err)
_, err = peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(rob))
require.ErrorIs(t, err, peerdas.ErrNotEnoughDataColumnSidecars)
})
t.Run("proofs array too short for column index", func(t *testing.T) {
// Create a Fulu block with a blob commitment.
signedBeaconBlockPb := util.NewBeaconBlockFulu()
signedBeaconBlockPb.Block.Body.BlobKzgCommitments = [][]byte{make([]byte, 48)}
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
// Create cells and proofs with sufficient cells but insufficient proofs.
numberOfColumns := params.BeaconConfig().NumberOfColumns
cellsAndProofs := []kzg.CellsAndProofs{
{
Cells: make([]kzg.Cell, numberOfColumns),
Proofs: make([]kzg.Proof, 5), // Only 5 proofs, less than columns
},
}
// This should fail when trying to access proof beyond index 4.
rob, err := blocks.NewROBlock(signedBeaconBlock)
require.NoError(t, err)
_, err = peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(rob))
require.ErrorIs(t, err, peerdas.ErrNotEnoughDataColumnSidecars)
require.ErrorContains(t, "not enough proofs", err)
})
t.Run("nominal", func(t *testing.T) {
// Create a Fulu block with blob commitments.
signedBeaconBlockPb := util.NewBeaconBlockFulu()
commitment1 := make([]byte, 48)
commitment2 := make([]byte, 48)
// Set different values to distinguish commitments
commitment1[0] = 0x01
commitment2[0] = 0x02
signedBeaconBlockPb.Block.Body.BlobKzgCommitments = [][]byte{commitment1, commitment2}
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
// Create cells and proofs with correct dimensions.
numberOfColumns := params.BeaconConfig().NumberOfColumns
cellsAndProofs := []kzg.CellsAndProofs{
{
Cells: make([]kzg.Cell, numberOfColumns),
Proofs: make([]kzg.Proof, numberOfColumns),
},
{
Cells: make([]kzg.Cell, numberOfColumns),
Proofs: make([]kzg.Proof, numberOfColumns),
},
}
// Set distinct values in cells and proofs for testing
for i := range numberOfColumns {
cellsAndProofs[0].Cells[i][0] = byte(i)
cellsAndProofs[0].Proofs[i][0] = byte(i)
cellsAndProofs[1].Cells[i][0] = byte(i + 128)
cellsAndProofs[1].Proofs[i][0] = byte(i + 128)
}
rob, err := blocks.NewROBlock(signedBeaconBlock)
require.NoError(t, err)
sidecars, err := peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(rob))
require.NoError(t, err)
require.NotNil(t, sidecars)
require.Equal(t, int(numberOfColumns), len(sidecars))
// Verify each sidecar has the expected structure
for i, sidecar := range sidecars {
require.Equal(t, uint64(i), sidecar.Index)
require.Equal(t, 2, len(sidecar.Column))
require.Equal(t, 2, len(sidecar.KzgCommitments))
require.Equal(t, 2, len(sidecar.KzgProofs))
// Verify commitments match what we set
require.DeepEqual(t, commitment1, sidecar.KzgCommitments[0])
require.DeepEqual(t, commitment2, sidecar.KzgCommitments[1])
// Verify column data comes from the correct cells
require.Equal(t, byte(i), sidecar.Column[0][0])
require.Equal(t, byte(i+128), sidecar.Column[1][0])
// Verify proofs come from the correct proofs
require.Equal(t, byte(i), sidecar.KzgProofs[0][0])
require.Equal(t, byte(i+128), sidecar.KzgProofs[1][0])
}
})
}
func TestReconstructionSource(t *testing.T) {
// Create a Fulu block with blob commitments.
signedBeaconBlockPb := util.NewBeaconBlockFulu()
commitment1 := make([]byte, 48)
commitment2 := make([]byte, 48)
// Set different values to distinguish commitments
commitment1[0] = 0x01
commitment2[0] = 0x02
signedBeaconBlockPb.Block.Body.BlobKzgCommitments = [][]byte{commitment1, commitment2}
// Create a signed beacon block from the protobuf.
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb)
require.NoError(t, err)
// Create cells and proofs with correct dimensions.
numberOfColumns := params.BeaconConfig().NumberOfColumns
cellsAndProofs := []kzg.CellsAndProofs{
{
Cells: make([]kzg.Cell, numberOfColumns),
Proofs: make([]kzg.Proof, numberOfColumns),
},
{
Cells: make([]kzg.Cell, numberOfColumns),
Proofs: make([]kzg.Proof, numberOfColumns),
},
}
// Set distinct values in cells and proofs for testing
for i := range numberOfColumns {
cellsAndProofs[0].Cells[i][0] = byte(i)
cellsAndProofs[0].Proofs[i][0] = byte(i)
cellsAndProofs[1].Cells[i][0] = byte(i + 128)
cellsAndProofs[1].Proofs[i][0] = byte(i + 128)
}
rob, err := blocks.NewROBlock(signedBeaconBlock)
require.NoError(t, err)
sidecars, err := peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(rob))
require.NoError(t, err)
require.NotNil(t, sidecars)
require.Equal(t, int(numberOfColumns), len(sidecars))
t.Run("from block", func(t *testing.T) {
src := peerdas.PopulateFromBlock(rob)
require.Equal(t, rob.Block().Slot(), src.Slot())
require.Equal(t, rob.Root(), src.Root())
require.Equal(t, rob.Block().ProposerIndex(), src.ProposerIndex())
commitments, err := src.Commitments()
require.NoError(t, err)
require.Equal(t, 2, len(commitments))
require.DeepEqual(t, commitment1, commitments[0])
require.DeepEqual(t, commitment2, commitments[1])
require.Equal(t, peerdas.BlockType, src.Type())
})
t.Run("from sidecar", func(t *testing.T) {
referenceSidecar := blocks.NewVerifiedRODataColumn(sidecars[0])
src := peerdas.PopulateFromSidecar(referenceSidecar)
require.Equal(t, referenceSidecar.Slot(), src.Slot())
require.Equal(t, referenceSidecar.BlockRoot(), src.Root())
require.Equal(t, referenceSidecar.ProposerIndex(), src.ProposerIndex())
commitments, err := src.Commitments()
require.NoError(t, err)
require.Equal(t, 2, len(commitments))
require.DeepEqual(t, commitment1, commitments[0])
require.DeepEqual(t, commitment2, commitments[1])
require.Equal(t, peerdas.SidecarType, src.Type())
})
}

View File

@@ -259,7 +259,6 @@ func (dcs *DataColumnStorage) Summary(root [fieldparams.RootLength]byte) DataCol
}
// Save saves data column sidecars into the database and asynchronously performs pruning.
// The returned channel is closed when the pruning is complete.
func (dcs *DataColumnStorage) Save(dataColumnSidecars []blocks.VerifiedRODataColumn) error {
startTime := time.Now()

View File

@@ -25,6 +25,7 @@ go_library(
"//testing/spectest:__subpackages__",
],
deps = [
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/cache/depositsnapshot:go_default_library",
"//beacon-chain/core/altair:go_default_library",
@@ -103,6 +104,7 @@ go_test(
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",

View File

@@ -7,6 +7,7 @@ import (
"strings"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/execution/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
@@ -101,10 +102,7 @@ const (
defaultEngineTimeout = time.Second
)
var (
errInvalidPayloadBodyResponse = errors.New("engine api payload body response is invalid")
errMissingBlobsAndProofsFromEL = errors.New("engine api payload body response is missing blobs and proofs")
)
var errInvalidPayloadBodyResponse = errors.New("engine api payload body response is invalid")
// ForkchoiceUpdatedResponse is the response kind received by the
// engine_forkchoiceUpdatedV1 endpoint.
@@ -123,7 +121,7 @@ type Reconstructor interface {
ctx context.Context, blindedBlocks []interfaces.ReadOnlySignedBeaconBlock,
) ([]interfaces.SignedBeaconBlock, error)
ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [fieldparams.RootLength]byte, hi func(uint64) bool) ([]blocks.VerifiedROBlob, error)
ReconstructDataColumnSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [fieldparams.RootLength]byte) ([]blocks.VerifiedRODataColumn, error)
ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error)
}
// EngineCaller defines a client that can interact with an Ethereum
@@ -651,22 +649,40 @@ func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.
return verifiedBlobs, nil
}
// ReconstructDataColumnSidecars reconstructs the verified data column sidecars for a given beacon block.
// It retrieves the KZG commitments from the block body, fetches the associated blobs and cell proofs from the EL,
// and constructs the corresponding verified read-only data column sidecars.
func (s *Service) ReconstructDataColumnSidecars(ctx context.Context, signedROBlock interfaces.ReadOnlySignedBeaconBlock, blockRoot [fieldparams.RootLength]byte) ([]blocks.VerifiedRODataColumn, error) {
block := signedROBlock.Block()
func (s *Service) ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error) {
root := populator.Root()
log := log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", blockRoot),
"slot": block.Slot(),
})
kzgCommitments, err := block.Body().BlobKzgCommitments()
// Fetch cells and proofs from the execution client using the KZG commitments from the sidecar.
commitments, err := populator.Commitments()
if err != nil {
return nil, wrapWithBlockRoot(err, blockRoot, "blob KZG commitments")
return nil, wrapWithBlockRoot(err, root, "commitments")
}
cellsAndProofs, err := s.fetchCellsAndProofsFromExecution(ctx, commitments)
if err != nil {
return nil, wrapWithBlockRoot(err, root, "fetch cells and proofs from execution client")
}
// Return early if nothing is returned from the EL.
if len(cellsAndProofs) == 0 {
return nil, nil
}
// Construct data column sidears from the signed block and cells and proofs.
roSidecars, err := peerdas.DataColumnSidecars(cellsAndProofs, populator)
if err != nil {
return nil, wrapWithBlockRoot(err, populator.Root(), "data column sidcars from column sidecar")
}
// Upgrade the sidecars to verified sidecars.
// We trust the execution layer we are connected to, so we can upgrade the sidecar into a verified one.
verifiedROSidecars := upgradeSidecarsToVerifiedSidecars(roSidecars)
return verifiedROSidecars, nil
}
// fetchCellsAndProofsFromExecution fetches cells and proofs from the execution client (using engine_getBlobsV2 execution API method)
func (s *Service) fetchCellsAndProofsFromExecution(ctx context.Context, kzgCommitments [][]byte) ([]kzg.CellsAndProofs, error) {
// Collect KZG hashes for all blobs.
versionedHashes := make([]common.Hash, 0, len(kzgCommitments))
for _, commitment := range kzgCommitments {
@@ -677,47 +693,32 @@ func (s *Service) ReconstructDataColumnSidecars(ctx context.Context, signedROBlo
// Fetch all blobsAndCellsProofs from the execution client.
blobAndProofV2s, err := s.GetBlobsV2(ctx, versionedHashes)
if err != nil {
return nil, wrapWithBlockRoot(err, blockRoot, "get blobs V2")
return nil, errors.Wrapf(err, "get blobs V2")
}
// Return early if nothing is returned from the EL.
if len(blobAndProofV2s) == 0 {
log.Debug("No blobs returned from execution client")
return nil, nil
}
// Extract the blobs and proofs from the blobAndProofV2s.
blobs, cellProofs := make([][]byte, 0, len(blobAndProofV2s)), make([][]byte, 0, len(blobAndProofV2s))
for _, blobsAndProofs := range blobAndProofV2s {
if blobsAndProofs == nil {
return nil, wrapWithBlockRoot(errMissingBlobsAndProofsFromEL, blockRoot, "")
}
blobs, cellProofs = append(blobs, blobsAndProofs.Blob), append(cellProofs, blobsAndProofs.KzgProofs...)
}
// Construct the data column sidcars from the blobs and cell proofs provided by the execution client.
dataColumnSidecars, err := peerdas.ConstructDataColumnSidecars(signedROBlock, blobs, cellProofs)
// Compute cells and proofs from the blobs and cell proofs.
cellsAndProofs, err := peerdas.ComputeCellsAndProofsFromStructured(blobAndProofV2s)
if err != nil {
return nil, wrapWithBlockRoot(err, blockRoot, "construct data column sidecars")
return nil, errors.Wrap(err, "compute cells and proofs")
}
// Finally, construct verified RO data column sidecars.
// We trust the execution layer we are connected to, so we can upgrade the read only data column sidecar into a verified one.
verifiedRODataColumns := make([]blocks.VerifiedRODataColumn, 0, len(dataColumnSidecars))
for _, dataColumnSidecar := range dataColumnSidecars {
roDataColumn, err := blocks.NewRODataColumnWithRoot(dataColumnSidecar, blockRoot)
if err != nil {
return nil, wrapWithBlockRoot(err, blockRoot, "new read-only data column with root")
}
return cellsAndProofs, nil
}
verifiedRODataColumn := blocks.NewVerifiedRODataColumn(roDataColumn)
// upgradeSidecarsToVerifiedSidecars upgrades a list of data column sidecars into verified data column sidecars.
func upgradeSidecarsToVerifiedSidecars(roSidecars []blocks.RODataColumn) []blocks.VerifiedRODataColumn {
verifiedRODataColumns := make([]blocks.VerifiedRODataColumn, 0, len(roSidecars))
for _, roSidecar := range roSidecars {
verifiedRODataColumn := blocks.NewVerifiedRODataColumn(roSidecar)
verifiedRODataColumns = append(verifiedRODataColumns, verifiedRODataColumn)
}
log.Debug("Data columns successfully reconstructed from the execution client")
return verifiedRODataColumns, nil
return verifiedRODataColumns
}
func fullPayloadFromPayloadBody(
@@ -1009,6 +1010,6 @@ func toBlockNumArg(number *big.Int) string {
}
// wrapWithBlockRoot returns a new error with the given block root.
func wrapWithBlockRoot(err error, blockRoot [32]byte, message string) error {
func wrapWithBlockRoot(err error, blockRoot [fieldparams.RootLength]byte, message string) error {
return errors.Wrap(err, fmt.Sprintf("%s for block %#x", message, blockRoot))
}

View File

@@ -14,6 +14,7 @@ import (
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
mocks "github.com/OffchainLabs/prysm/v6/beacon-chain/execution/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
@@ -2556,7 +2557,7 @@ func TestReconstructBlobSidecars(t *testing.T) {
})
}
func TestReconstructDataColumnSidecars(t *testing.T) {
func TestConstructDataColumnSidecars(t *testing.T) {
// Start the trusted setup.
err := kzg.Start()
require.NoError(t, err)
@@ -2580,11 +2581,14 @@ func TestReconstructDataColumnSidecars(t *testing.T) {
sb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
roBlock, err := blocks.NewROBlockWithRoot(sb, r)
require.NoError(t, err)
ctx := context.Background()
t.Run("GetBlobsV2 is not supported", func(t *testing.T) {
_, err := client.ReconstructDataColumnSidecars(ctx, sb, r)
require.ErrorContains(t, "get blobs V2 for block", err)
_, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.ErrorContains(t, "engine_getBlobsV2 is not supported", err)
})
t.Run("nothing received", func(t *testing.T) {
@@ -2594,7 +2598,7 @@ func TestReconstructDataColumnSidecars(t *testing.T) {
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
dataColumns, err := client.ReconstructDataColumnSidecars(ctx, sb, r)
dataColumns, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
require.Equal(t, 0, len(dataColumns))
})
@@ -2607,23 +2611,22 @@ func TestReconstructDataColumnSidecars(t *testing.T) {
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
dataColumns, err := client.ReconstructDataColumnSidecars(ctx, sb, r)
dataColumns, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
require.Equal(t, 128, len(dataColumns))
})
t.Run("missing some blobs", func(t *testing.T) {
blobMasks := []bool{false, true, true, true, true, true}
srv := createBlobServerV2(t, 6, blobMasks)
defer srv.Close()
// t.Run("missing some blobs", func(t *testing.T) {
// blobMasks := []bool{false, true, true, true, true, true}
// srv := createBlobServerV2(t, 6, blobMasks)
// defer srv.Close()
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
// rpcClient, client := setupRpcClientV2(t, srv.URL, client)
// defer rpcClient.Close()
dataColumns, err := client.ReconstructDataColumnSidecars(ctx, sb, r)
require.ErrorContains(t, errMissingBlobsAndProofsFromEL.Error(), err)
require.Equal(t, 0, len(dataColumns))
})
// _, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
// require.ErrorContains(t, "fetch cells and proofs from execution client", err)
// })
}
func createRandomKzgCommitments(t *testing.T, num int) [][]byte {

View File

@@ -14,6 +14,7 @@ go_library(
],
deps = [
"//async/event:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/execution/types:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",

View File

@@ -4,6 +4,7 @@ import (
"context"
"math/big"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
@@ -116,7 +117,8 @@ func (e *EngineClient) ReconstructBlobSidecars(context.Context, interfaces.ReadO
return e.BlobSidecars, e.ErrorBlobSidecars
}
func (e *EngineClient) ReconstructDataColumnSidecars(context.Context, interfaces.ReadOnlySignedBeaconBlock, [fieldparams.RootLength]byte) ([]blocks.VerifiedRODataColumn, error) {
// ConstructDataColumnSidecars is a mock implementation of the ConstructDataColumnSidecars method.
func (e *EngineClient) ConstructDataColumnSidecars(context.Context, peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error) {
return e.DataColumnSidecars, e.ErrorDataColumnSidecars
}

View File

@@ -60,6 +60,7 @@ go_library(
"//config/features:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//consensus-types/wrapper:go_default_library",

View File

@@ -11,6 +11,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/crypto/hash"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing"
@@ -308,19 +309,13 @@ func (s *Service) BroadcastLightClientFinalityUpdate(ctx context.Context, update
// BroadcastDataColumnSidecar broadcasts a data column to the p2p network, the message is assumed to be
// broadcasted to the current fork and to the input column subnet.
func (s *Service) BroadcastDataColumnSidecar(
root [fieldparams.RootLength]byte,
dataColumnSubnet uint64,
dataColumnSidecar *ethpb.DataColumnSidecar,
dataColumnSidecar blocks.VerifiedRODataColumn,
) error {
// Add tracing to the function.
ctx, span := trace.StartSpan(s.ctx, "p2p.BroadcastDataColumnSidecar")
defer span.End()
// Ensure the data column sidecar is not nil.
if dataColumnSidecar == nil {
return errors.Errorf("attempted to broadcast nil data column sidecar at subnet %d", dataColumnSubnet)
}
// Retrieve the current fork digest.
forkDigest, err := s.currentForkDigest()
if err != nil {
@@ -330,16 +325,15 @@ func (s *Service) BroadcastDataColumnSidecar(
}
// Non-blocking broadcast, with attempts to discover a column subnet peer if none available.
go s.internalBroadcastDataColumnSidecar(ctx, root, dataColumnSubnet, dataColumnSidecar, forkDigest)
go s.internalBroadcastDataColumnSidecar(ctx, dataColumnSubnet, dataColumnSidecar, forkDigest)
return nil
}
func (s *Service) internalBroadcastDataColumnSidecar(
ctx context.Context,
root [fieldparams.RootLength]byte,
columnSubnet uint64,
dataColumnSidecar *ethpb.DataColumnSidecar,
dataColumnSidecar blocks.VerifiedRODataColumn,
forkDigest [fieldparams.VersionLength]byte,
) {
// Add tracing to the function.
@@ -385,7 +379,7 @@ func (s *Service) internalBroadcastDataColumnSidecar(
log.WithFields(logrus.Fields{
"slot": slot,
"timeSinceSlotStart": time.Since(slotStartTime),
"root": fmt.Sprintf("%#x", root),
"root": fmt.Sprintf("%#x", dataColumnSidecar.BlockRoot()),
"columnSubnet": columnSubnet,
}).Debug("Broadcasted data column sidecar")

View File

@@ -711,13 +711,8 @@ func TestService_BroadcastDataColumn(t *testing.T) {
subnet := peerdas.ComputeSubnetForDataColumnSidecar(columnIndex)
topic := fmt.Sprintf(topicFormat, digest, subnet) + service.Encoding().ProtocolSuffix()
roSidecars, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: columnIndex}})
sidecar := roSidecars[0].DataColumnSidecar
// Attempt to broadcast nil object should fail.
var emptyRoot [fieldparams.RootLength]byte
err = service.BroadcastDataColumnSidecar(emptyRoot, subnet, nil)
require.ErrorContains(t, "attempted to broadcast nil", err)
_, verifiedRoSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: columnIndex}})
verifiedRoSidecar := verifiedRoSidecars[0]
// Subscribe to the topic.
sub, err := p2.SubscribeToTopic(topic)
@@ -727,7 +722,7 @@ func TestService_BroadcastDataColumn(t *testing.T) {
time.Sleep(50 * time.Millisecond)
// Broadcast to peers and wait.
err = service.BroadcastDataColumnSidecar(emptyRoot, subnet, sidecar)
err = service.BroadcastDataColumnSidecar(subnet, verifiedRoSidecar)
require.NoError(t, err)
// Receive the message.
@@ -739,5 +734,5 @@ func TestService_BroadcastDataColumn(t *testing.T) {
var result ethpb.DataColumnSidecar
require.NoError(t, service.Encoding().DecodeGossip(msg.Data, &result))
require.DeepEqual(t, &result, sidecar)
require.DeepEqual(t, &result, verifiedRoSidecar)
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
@@ -51,7 +52,7 @@ type (
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
BroadcastLightClientOptimisticUpdate(ctx context.Context, update interfaces.LightClientOptimisticUpdate) error
BroadcastLightClientFinalityUpdate(ctx context.Context, update interfaces.LightClientFinalityUpdate) error
BroadcastDataColumnSidecar(root [fieldparams.RootLength]byte, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar) error
BroadcastDataColumnSidecar(columnSubnet uint64, dataColumnSidecar blocks.VerifiedRODataColumn) error
}
// SetStreamHandler configures p2p to handle streams of a certain topic ID.

View File

@@ -25,6 +25,7 @@ go_library(
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",

View File

@@ -6,6 +6,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
@@ -168,7 +169,7 @@ func (*FakeP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interfac
}
// BroadcastDataColumnSidecar -- fake.
func (*FakeP2P) BroadcastDataColumnSidecar(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar) error {
func (*FakeP2P) BroadcastDataColumnSidecar(_ uint64, _ blocks.VerifiedRODataColumn) error {
return nil
}

View File

@@ -5,7 +5,7 @@ import (
"sync"
"sync/atomic"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"google.golang.org/protobuf/proto"
@@ -63,7 +63,7 @@ func (m *MockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
}
// BroadcastDataColumnSidecar broadcasts a data column for mock.
func (m *MockBroadcaster) BroadcastDataColumnSidecar([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar) error {
func (m *MockBroadcaster) BroadcastDataColumnSidecar(uint64, blocks.VerifiedRODataColumn) error {
m.BroadcastCalled.Store(true)
return nil
}

View File

@@ -17,6 +17,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
@@ -232,7 +233,7 @@ func (p *TestP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interf
}
// BroadcastDataColumnSidecar broadcasts a data column for mock.
func (p *TestP2P) BroadcastDataColumnSidecar([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar) error {
func (p *TestP2P) BroadcastDataColumnSidecar(uint64, blocks.VerifiedRODataColumn) error {
p.BroadcastCalled.Store(true)
return nil
}

View File

@@ -220,16 +220,13 @@ func TestGetBlob(t *testing.T) {
cellsAndProofsList = append(cellsAndProofsList, cellsAndProogs)
}
dataColumnSidecarPb, err := peerdas.DataColumnSidecars(fuluBlock, cellsAndProofsList)
roDataColumnSidecars, err := peerdas.DataColumnSidecars(cellsAndProofsList, peerdas.PopulateFromBlock(fuluBlock))
require.NoError(t, err)
verifiedRoDataColumnSidecars := make([]blocks.VerifiedRODataColumn, 0, len(dataColumnSidecarPb))
for _, sidecarPb := range dataColumnSidecarPb {
roDataColumn, err := blocks.NewRODataColumnWithRoot(sidecarPb, fuluBlockRoot)
require.NoError(t, err)
verifiedRoDataColumn := blocks.NewVerifiedRODataColumn(roDataColumn)
verifiedRoDataColumnSidecars = append(verifiedRoDataColumnSidecars, verifiedRoDataColumn)
verifiedRoDataColumnSidecars := make([]blocks.VerifiedRODataColumn, 0, len(roDataColumnSidecars))
for _, roDataColumnSidecar := range roDataColumnSidecars {
verifiedRoDataColumnSidecar := blocks.NewVerifiedRODataColumn(roDataColumnSidecar)
verifiedRoDataColumnSidecars = append(verifiedRoDataColumnSidecars, verifiedRoDataColumnSidecar)
}
err = db.SaveBlock(t.Context(), fuluBlock)

View File

@@ -281,7 +281,7 @@ func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.Signed
func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSignedBeaconBlock) (*ethpb.ProposeResponse, error) {
var (
blobSidecars []*ethpb.BlobSidecar
dataColumnSidecars []*ethpb.DataColumnSidecar
dataColumnSidecars []blocks.RODataColumn
)
ctx, span := trace.StartSpan(ctx, "ProposerServer.ProposeBeaconBlock")
@@ -309,10 +309,11 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
}
rob, err := blocks.NewROBlockWithRoot(block, root)
if block.IsBlinded() {
block, blobSidecars, err = vs.handleBlindedBlock(ctx, block)
} else if block.Version() >= version.Deneb {
blobSidecars, dataColumnSidecars, err = vs.handleUnblindedBlock(block, req)
blobSidecars, dataColumnSidecars, err = vs.handleUnblindedBlock(rob, req)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "%s: %v", "handle block failed", err)
@@ -348,10 +349,10 @@ func (vs *Server) broadcastAndReceiveSidecars(
block interfaces.SignedBeaconBlock,
root [fieldparams.RootLength]byte,
blobSidecars []*ethpb.BlobSidecar,
dataColumnSideCars []*ethpb.DataColumnSidecar,
dataColumnSidecars []blocks.RODataColumn,
) error {
if block.Version() >= version.Fulu {
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSideCars, root); err != nil {
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSidecars, root); err != nil {
return errors.Wrap(err, "broadcast and receive data columns")
}
return nil
@@ -398,21 +399,28 @@ func (vs *Server) handleBlindedBlock(ctx context.Context, block interfaces.Signe
}
func (vs *Server) handleUnblindedBlock(
block interfaces.SignedBeaconBlock,
block blocks.ROBlock,
req *ethpb.GenericSignedBeaconBlock,
) ([]*ethpb.BlobSidecar, []*ethpb.DataColumnSidecar, error) {
) ([]*ethpb.BlobSidecar, []blocks.RODataColumn, error) {
rawBlobs, proofs, err := blobsAndProofs(req)
if err != nil {
return nil, nil, err
}
if block.Version() >= version.Fulu {
dataColumnSideCars, err := peerdas.ConstructDataColumnSidecars(block, rawBlobs, proofs)
// Compute cells and proofs from the blobs and cell proofs.
cellsAndProofs, err := peerdas.ComputeCellsAndProofsFromFlat(rawBlobs, proofs)
if err != nil {
return nil, nil, errors.Wrap(err, "construct data column sidecars")
return nil, nil, errors.Wrap(err, "compute cells and proofs")
}
return nil, dataColumnSideCars, nil
// Construct data column sidecars from the signed block and cells and proofs.
roDataColumnSidecars, err := peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(block))
if err != nil {
return nil, nil, errors.Wrap(err, "data column sidcars")
}
return nil, roDataColumnSidecars, nil
}
blobSidecars, err := BuildBlobSidecars(block, rawBlobs, proofs)
@@ -468,26 +476,21 @@ func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethp
// broadcastAndReceiveDataColumns handles the broadcasting and reception of data columns sidecars.
func (vs *Server) broadcastAndReceiveDataColumns(
ctx context.Context,
sidecars []*ethpb.DataColumnSidecar,
roSidecars []blocks.RODataColumn,
root [fieldparams.RootLength]byte,
) error {
verifiedRODataColumns := make([]blocks.VerifiedRODataColumn, 0, len(sidecars))
verifiedRODataColumns := make([]blocks.VerifiedRODataColumn, 0, len(roSidecars))
eg, _ := errgroup.WithContext(ctx)
for _, sidecar := range sidecars {
roDataColumn, err := blocks.NewRODataColumnWithRoot(sidecar, root)
if err != nil {
return errors.Wrap(err, "new read-only data column with root")
}
for _, roSidecar := range roSidecars {
// We build this block ourselves, so we can upgrade the read only data column sidecar into a verified one.
verifiedRODataColumn := blocks.NewVerifiedRODataColumn(roDataColumn)
verifiedRODataColumn := blocks.NewVerifiedRODataColumn(roSidecar)
verifiedRODataColumns = append(verifiedRODataColumns, verifiedRODataColumn)
eg.Go(func() error {
// Compute the subnet index based on the column index.
subnet := peerdas.ComputeSubnetForDataColumnSidecar(sidecar.Index)
subnet := peerdas.ComputeSubnetForDataColumnSidecar(roSidecar.Index)
if err := vs.P2P.BroadcastDataColumnSidecar(root, subnet, sidecar); err != nil {
if err := vs.P2P.BroadcastDataColumnSidecar(subnet, verifiedRODataColumn); err != nil {
return errors.Wrap(err, "broadcast data column")
}

View File

@@ -10,7 +10,7 @@ import (
)
// BuildBlobSidecars given a block, builds the blob sidecars for the block.
func BuildBlobSidecars(blk interfaces.SignedBeaconBlock, blobs [][]byte, kzgProofs [][]byte) ([]*ethpb.BlobSidecar, error) {
func BuildBlobSidecars(blk interfaces.ReadOnlySignedBeaconBlock, blobs [][]byte, kzgProofs [][]byte) ([]*ethpb.BlobSidecar, error) {
if blk.Version() < version.Deneb {
return nil, nil // No blobs before deneb.
}

View File

@@ -154,6 +154,8 @@ go_library(
"@com_github_trailofbits_go_mutexasserts//:go_default_library",
"@io_opentelemetry_go_otel_trace//:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
"@org_golang_x_sync//singleflight:go_default_library",
],
)

View File

@@ -3,220 +3,122 @@ package sync
import (
"context"
"fmt"
"slices"
"sync"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
broadcastMissingDataColumnsTimeIntoSlotMin = 1 * time.Second
broadcastMissingDataColumnsSlack = 2 * time.Second
)
// processDataColumnSidecarsFromReconstruction, after a random delay, attempts to reconstruct,
// broadcast and receive missing data column sidecars for the given block root.
// https:github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#reconstruction-and-cross-seeding
func (s *Service) processDataColumnSidecarsFromReconstruction(ctx context.Context, sidecar blocks.VerifiedRODataColumn) error {
key := fmt.Sprintf("%#x", sidecar.BlockRoot())
if _, err, _ := s.reconstructionSingleFlight.Do(key, func() (interface{}, error) {
const maxReconstructionDelaySec = 2.
// reconstructSaveBroadcastDataColumnSidecars reconstructs, if possible,
// all data column sidecars. Then, it saves missing sidecars to the store.
// After a delay, it broadcasts in the background not seen via gossip
// (but reconstructed) sidecars.
func (s *Service) reconstructSaveBroadcastDataColumnSidecars(
ctx context.Context,
slot primitives.Slot,
proposerIndex primitives.ValidatorIndex,
root [fieldparams.RootLength]byte,
) error {
startTime := time.Now()
samplesPerSlot := params.BeaconConfig().SamplesPerSlot
var wg sync.WaitGroup
// Lock to prevent concurrent reconstructions.
s.reconstructionLock.Lock()
defer s.reconstructionLock.Unlock()
root := sidecar.BlockRoot()
slot := sidecar.Slot()
proposerIndex := sidecar.ProposerIndex()
// Return early if reconstruction is not needed.
if !s.shouldReconstruct(root) {
return nil, nil
}
// Compute the slot start time.
slotStartTime, err := slots.StartTime(s.cfg.clock.GenesisTime(), slot)
if err != nil {
return nil, errors.Wrap(err, "failed to calculate slot start time")
}
// Randomly choose value before starting reconstruction.
randFloat := s.reconstructionRandGen.Float64()
timeIntoSlot := time.Duration(maxReconstructionDelaySec*randFloat) * time.Second
broadcastTime := slotStartTime.Add(timeIntoSlot)
waitingTime := time.Until(broadcastTime)
wg.Add(1)
time.AfterFunc(waitingTime, func() {
defer wg.Done()
// Return early if the context was canceled during the waiting time.
if err := ctx.Err(); err != nil {
return
}
// Return early if reconstruction is not needed anymore.
if !s.shouldReconstruct(root) {
return
}
// Load all the stored data column sidecars for this root.
verifiedSidecars, err := s.cfg.dataColumnStorage.Get(root, nil)
if err != nil {
log.WithError(err).Error("Failed to get data column sidecars")
return
}
// Reconstruct all the data column sidecars.
reconstructedSidecars, err := peerdas.ReconstructDataColumnSidecars(verifiedSidecars)
if err != nil {
log.WithError(err).Error("Failed to reconstruct data column sidecars")
return
}
// Retrieve indices of data column sidecars to sample.
columnIndicesToSample, err := s.columnIndicesToSample()
if err != nil {
log.WithError(err).Error("Failed to get column indices to sample")
return
}
unseenIndices, err := s.broadcastAndReceiveUnseenDataColumnSidecars(ctx, slot, proposerIndex, columnIndicesToSample, reconstructedSidecars)
if err != nil {
log.WithError(err).Error("Failed to broadcast and receive unseen data column sidecars")
return
}
if len(unseenIndices) > 0 {
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"slot": slot,
"proposerIndex": proposerIndex,
"count": len(unseenIndices),
"indices": sortedSliceFromMap(unseenIndices),
}).Debug("Reconstructed data column sidecars")
}
})
wg.Wait()
return nil, nil
}); err != nil {
return err
}
return nil
}
// shouldReconstruct returns true if we should attempt to reconstruct the data columns for the given block root.
func (s *Service) shouldReconstruct(root [fieldparams.RootLength]byte) bool {
numberOfColumns := params.BeaconConfig().NumberOfColumns
// Get the columns we store.
storedDataColumns := s.cfg.dataColumnStorage.Summary(root)
storedColumnsCount := storedDataColumns.Count()
numberOfColumns := params.BeaconConfig().NumberOfColumns
// If reconstruction is not possible or if all columns are already stored, exit early.
// Do not reconstruct if we don't have enough columns or if we already have all columns.
if storedColumnsCount < peerdas.MinimumColumnCountToReconstruct() || storedColumnsCount == numberOfColumns {
return nil
return false
}
// Retrieve our local node info.
nodeID := s.cfg.p2p.NodeID()
custodyGroupCount, err := s.cfg.p2p.CustodyGroupCount()
if err != nil {
return errors.Wrap(err, "custody group count")
}
samplingSize := max(custodyGroupCount, samplesPerSlot)
localNodeInfo, _, err := peerdas.Info(nodeID, samplingSize)
if err != nil {
return errors.Wrap(err, "peer info")
}
// Load all the possible data column sidecars, to minimize reconstruction time.
verifiedSidecars, err := s.cfg.dataColumnStorage.Get(root, nil)
if err != nil {
return errors.Wrap(err, "get data column sidecars")
}
// Reconstruct all the data column sidecars.
reconstructedSidecars, err := peerdas.ReconstructDataColumnSidecars(verifiedSidecars)
if err != nil {
return errors.Wrap(err, "reconstruct data column sidecars")
}
// Filter reconstructed sidecars to save.
custodyColumns := localNodeInfo.CustodyColumns
toSaveSidecars := make([]blocks.VerifiedRODataColumn, 0, len(custodyColumns))
for _, sidecar := range reconstructedSidecars {
if custodyColumns[sidecar.Index] {
toSaveSidecars = append(toSaveSidecars, sidecar)
}
}
// Save the data column sidecars to the database.
// Note: We do not call `receiveDataColumn`, because it will ignore
// incoming data columns via gossip while we did not broadcast (yet) the reconstructed data columns.
if err := s.cfg.dataColumnStorage.Save(toSaveSidecars); err != nil {
return errors.Wrap(err, "save data column sidecars")
}
slotStartTime, err := slots.StartTime(s.cfg.clock.GenesisTime(), slot)
if err != nil {
return errors.Wrap(err, "failed to calculate slot start time")
}
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"slot": slot,
"fromColumnsCount": storedColumnsCount,
"sinceSlotStartTime": time.Since(slotStartTime),
"reconstructionAndSaveDuration": time.Since(startTime),
}).Debug("Data columns reconstructed and saved")
// Update reconstruction metrics.
dataColumnReconstructionHistogram.Observe(float64(time.Since(startTime).Milliseconds()))
dataColumnReconstructionCounter.Add(float64(len(reconstructedSidecars) - len(verifiedSidecars)))
// Schedule the broadcast.
if err := s.scheduleMissingDataColumnSidecarsBroadcast(ctx, root, proposerIndex, slot); err != nil {
return errors.Wrap(err, "schedule reconstructed data columns broadcast")
}
return nil
}
// scheduleMissingDataColumnSidecarsBroadcast schedules the broadcast of missing
// (aka. not seen via gossip but reconstructed) sidecars.
func (s *Service) scheduleMissingDataColumnSidecarsBroadcast(
ctx context.Context,
root [fieldparams.RootLength]byte,
proposerIndex primitives.ValidatorIndex,
slot primitives.Slot,
) error {
log := log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%x", root),
"slot": slot,
})
// Get the time corresponding to the start of the slot.
genesisTime := s.cfg.clock.GenesisTime()
slotStartTime, err := slots.StartTime(genesisTime, slot)
if err != nil {
return errors.Wrap(err, "failed to calculate slot start time")
}
// Compute the waiting time. This could be negative. In such a case, broadcast immediately.
randFloat := s.reconstructionRandGen.Float64()
timeIntoSlot := broadcastMissingDataColumnsTimeIntoSlotMin + time.Duration(float64(broadcastMissingDataColumnsSlack)*randFloat)
broadcastTime := slotStartTime.Add(timeIntoSlot)
waitingTime := time.Until(broadcastTime)
time.AfterFunc(waitingTime, func() {
// Return early if the context was canceled during the waiting time.
if err := ctx.Err(); err != nil {
return
}
if err := s.broadcastMissingDataColumnSidecars(slot, proposerIndex, root, timeIntoSlot); err != nil {
log.WithError(err).Error("Failed to broadcast missing data column sidecars")
}
})
return nil
}
func (s *Service) broadcastMissingDataColumnSidecars(
slot primitives.Slot,
proposerIndex primitives.ValidatorIndex,
root [fieldparams.RootLength]byte,
timeIntoSlot time.Duration,
) error {
// Get the node ID.
nodeID := s.cfg.p2p.NodeID()
// Retrieve the local node info.
custodyGroupCount, err := s.cfg.p2p.CustodyGroupCount()
if err != nil {
return errors.Wrap(err, "custody group count")
}
localNodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)
if err != nil {
return errors.Wrap(err, "peerdas info")
}
// Compute the missing data columns (data columns we should custody but we did not received via gossip.)
missingColumns := make([]uint64, 0, len(localNodeInfo.CustodyColumns))
for column := range localNodeInfo.CustodyColumns {
if !s.hasSeenDataColumnIndex(slot, proposerIndex, column) {
missingColumns = append(missingColumns, column)
}
}
// Return early if there are no missing data columns.
if len(missingColumns) == 0 {
return nil
}
// Load from the store the non received but reconstructed data column.
verifiedRODataColumnSidecars, err := s.cfg.dataColumnStorage.Get(root, missingColumns)
if err != nil {
return errors.Wrap(err, "data column storage get")
}
broadcastedColumns := make([]uint64, 0, len(verifiedRODataColumnSidecars))
for _, verifiedRODataColumn := range verifiedRODataColumnSidecars {
broadcastedColumns = append(broadcastedColumns, verifiedRODataColumn.Index)
// Compute the subnet for this column.
subnet := peerdas.ComputeSubnetForDataColumnSidecar(verifiedRODataColumn.Index)
// Broadcast the missing data column.
if err := s.cfg.p2p.BroadcastDataColumnSidecar(root, subnet, verifiedRODataColumn.DataColumnSidecar); err != nil {
log.WithError(err).Error("Broadcast data column")
}
// Now, we can set the data column as seen.
s.setSeenDataColumnIndex(slot, proposerIndex, verifiedRODataColumn.Index)
}
if logrus.GetLevel() >= logrus.DebugLevel {
// Sort for nice logging.
slices.Sort(broadcastedColumns)
slices.Sort(missingColumns)
log.WithFields(logrus.Fields{
"timeIntoSlot": timeIntoSlot,
"missingColumns": missingColumns,
"broadcasted": broadcastedColumns,
}).Debug("Start broadcasting not seen via gossip but reconstructed data columns")
}
return nil
return true
}

View File

@@ -9,7 +9,6 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
)
@@ -27,9 +26,6 @@ func TestReconstructDataColumns(t *testing.T) {
roBlock, _, verifiedRoDataColumns := util.GenerateTestFuluBlockWithSidecars(t, blobCount)
require.Equal(t, numberOfColumns, uint64(len(verifiedRoDataColumns)))
root, block := roBlock.Root(), roBlock.Block()
slot, proposerIndex := block.Slot(), block.ProposerIndex()
minimumCount := peerdas.MinimumColumnCountToReconstruct()
t.Run("not enough stored sidecars", func(t *testing.T) {
@@ -38,7 +34,7 @@ func TestReconstructDataColumns(t *testing.T) {
require.NoError(t, err)
service := NewService(ctx, WithP2P(p2ptest.NewTestP2P(t)), WithDataColumnStorage(storage))
err = service.reconstructSaveBroadcastDataColumnSidecars(ctx, slot, proposerIndex, root)
err = service.processDataColumnSidecarsFromReconstruction(ctx, verifiedRoDataColumns[0])
require.NoError(t, err)
})
@@ -48,7 +44,7 @@ func TestReconstructDataColumns(t *testing.T) {
require.NoError(t, err)
service := NewService(ctx, WithP2P(p2ptest.NewTestP2P(t)), WithDataColumnStorage(storage))
err = service.reconstructSaveBroadcastDataColumnSidecars(ctx, slot, proposerIndex, root)
err = service.processDataColumnSidecarsFromReconstruction(ctx, verifiedRoDataColumns[0])
require.NoError(t, err)
})
@@ -72,7 +68,7 @@ func TestReconstructDataColumns(t *testing.T) {
WithChainService(&mockChain.ChainService{}),
)
err = service.reconstructSaveBroadcastDataColumnSidecars(ctx, slot, proposerIndex, root)
err = service.processDataColumnSidecarsFromReconstruction(ctx, verifiedRoDataColumns[0])
require.NoError(t, err)
expected := make(map[uint64]bool, minimumCount+cgc)
@@ -85,7 +81,7 @@ func TestReconstructDataColumns(t *testing.T) {
expected[i] = true
}
summary := storage.Summary(root)
summary := storage.Summary(roBlock.Root())
actual := summary.Stored()
require.Equal(t, len(expected), len(actual))
@@ -95,83 +91,83 @@ func TestReconstructDataColumns(t *testing.T) {
})
}
func TestBroadcastMissingDataColumnSidecars(t *testing.T) {
const (
cgc = 8
blobCount = 4
timeIntoSlot = 0
)
// func TestBroadcastMissingDataColumnSidecars(t *testing.T) {
// const (
// cgc = 8
// blobCount = 4
// timeIntoSlot = 0
// )
numberOfColumns := params.BeaconConfig().NumberOfColumns
ctx := t.Context()
// numberOfColumns := params.BeaconConfig().NumberOfColumns
// ctx := t.Context()
// Start the trusted setup.
err := kzg.Start()
require.NoError(t, err)
// // Start the trusted setup.
// err := kzg.Start()
// require.NoError(t, err)
roBlock, _, verifiedRoDataColumns := util.GenerateTestFuluBlockWithSidecars(t, blobCount)
require.Equal(t, numberOfColumns, uint64(len(verifiedRoDataColumns)))
// roBlock, _, verifiedRoDataColumns := util.GenerateTestFuluBlockWithSidecars(t, blobCount)
// require.Equal(t, numberOfColumns, uint64(len(verifiedRoDataColumns)))
root, block := roBlock.Root(), roBlock.Block()
slot, proposerIndex := block.Slot(), block.ProposerIndex()
// root, block := roBlock.Root(), roBlock.Block()
// slot, proposerIndex := block.Slot(), block.ProposerIndex()
t.Run("no missing sidecars", func(t *testing.T) {
service := NewService(
ctx,
WithP2P(p2ptest.NewTestP2P(t)),
)
// t.Run("no missing sidecars", func(t *testing.T) {
// service := NewService(
// ctx,
// WithP2P(p2ptest.NewTestP2P(t)),
// )
for _, index := range [...]uint64{1, 17, 19, 42, 75, 87, 102, 117} {
key := computeCacheKey(slot, proposerIndex, index)
service.seenDataColumnCache.Add(slot, key, true)
}
// for _, index := range [...]uint64{1, 17, 19, 42, 75, 87, 102, 117} {
// key := computeCacheKey(slot, proposerIndex, index)
// service.seenDataColumnCache.Add(slot, key, true)
// }
err := service.broadcastMissingDataColumnSidecars(slot, proposerIndex, root, timeIntoSlot)
require.NoError(t, err)
})
// err := service.broadcastMissingDataColumnSidecars(slot, proposerIndex, root, timeIntoSlot)
// require.NoError(t, err)
// })
t.Run("some missing sidecars", func(t *testing.T) {
toSave := make([]blocks.VerifiedRODataColumn, 0, 2)
for _, index := range [...]uint64{42, 87} {
toSave = append(toSave, verifiedRoDataColumns[index])
}
// t.Run("some missing sidecars", func(t *testing.T) {
// toSave := make([]blocks.VerifiedRODataColumn, 0, 2)
// for _, index := range [...]uint64{42, 87} {
// toSave = append(toSave, verifiedRoDataColumns[index])
// }
p2p := p2ptest.NewTestP2P(t)
storage := filesystem.NewEphemeralDataColumnStorage(t)
err := storage.Save(toSave)
require.NoError(t, err)
// p2p := p2ptest.NewTestP2P(t)
// storage := filesystem.NewEphemeralDataColumnStorage(t)
// err := storage.Save(toSave)
// require.NoError(t, err)
service := NewService(
ctx,
WithP2P(p2p),
WithDataColumnStorage(storage),
)
_, _, err = service.cfg.p2p.UpdateCustodyInfo(0, cgc)
require.NoError(t, err)
// service := NewService(
// ctx,
// WithP2P(p2p),
// WithDataColumnStorage(storage),
// )
// _, _, err = service.cfg.p2p.UpdateCustodyInfo(0, cgc)
// require.NoError(t, err)
for _, index := range [...]uint64{1, 17, 19, 102, 117} { // 42, 75 and 87 are missing
key := computeCacheKey(slot, proposerIndex, index)
service.seenDataColumnCache.Add(slot, key, true)
}
// for _, index := range [...]uint64{1, 17, 19, 102, 117} { // 42, 75 and 87 are missing
// key := computeCacheKey(slot, proposerIndex, index)
// service.seenDataColumnCache.Add(slot, key, true)
// }
for _, index := range [...]uint64{42, 75, 87} {
seen := service.hasSeenDataColumnIndex(slot, proposerIndex, index)
require.Equal(t, false, seen)
}
// for _, index := range [...]uint64{42, 75, 87} {
// seen := service.hasSeenDataColumnIndex(slot, proposerIndex, index)
// require.Equal(t, false, seen)
// }
require.Equal(t, false, p2p.BroadcastCalled.Load())
// require.Equal(t, false, p2p.BroadcastCalled.Load())
err = service.broadcastMissingDataColumnSidecars(slot, proposerIndex, root, timeIntoSlot)
require.NoError(t, err)
// err = service.broadcastMissingDataColumnSidecars(slot, proposerIndex, root, timeIntoSlot)
// require.NoError(t, err)
seen := service.hasSeenDataColumnIndex(slot, proposerIndex, 75)
require.Equal(t, false, seen)
// seen := service.hasSeenDataColumnIndex(slot, proposerIndex, 75)
// require.Equal(t, false, seen)
for _, index := range [...]uint64{42, 87} {
seen := service.hasSeenDataColumnIndex(slot, proposerIndex, index)
require.Equal(t, true, seen)
}
// for _, index := range [...]uint64{42, 87} {
// seen := service.hasSeenDataColumnIndex(slot, proposerIndex, index)
// require.Equal(t, true, seen)
// }
require.Equal(t, true, p2p.BroadcastCalled.Load())
})
}
// require.Equal(t, true, p2p.BroadcastCalled.Load())
// })
// }

View File

@@ -50,6 +50,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/trailofbits/go-mutexasserts"
"golang.org/x/sync/singleflight"
)
var _ runtime.Service = (*Service)(nil)
@@ -170,8 +171,9 @@ type Service struct {
verifierWaiter *verification.InitializerWaiter
newBlobVerifier verification.NewBlobVerifier
newColumnsVerifier verification.NewDataColumnsVerifier
columnSidecarsExecSingleFlight singleflight.Group
reconstructionSingleFlight singleflight.Group
availableBlocker coverage.AvailableBlocker
reconstructionLock sync.Mutex
reconstructionRandGen *rand.Rand
trackedValidatorsCache *cache.TrackedValidatorsCache
ctxMap ContextByteVersions

View File

@@ -5,16 +5,20 @@ import (
"fmt"
"os"
"path"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/transition/interop"
"github.com/OffchainLabs/prysm/v6/config/features"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/io/file"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
@@ -37,7 +41,12 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
return err
}
go s.processSidecarsFromExecution(ctx, signed)
roBlock, err := blocks.NewROBlockWithRoot(signed, root)
if err != nil {
return errors.Wrap(err, "new ro block with root")
}
go s.processSidecarsFromExecutionFromBlock(ctx, roBlock)
if err := s.cfg.chain.ReceiveBlock(ctx, signed, root, nil); err != nil {
if blockchain.IsInvalidBlock(err) {
@@ -61,103 +70,21 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
return err
}
// processSidecarsFromExecution retrieves (if available) sidecars data from the execution client,
// processSidecarsFromExecutionFromBlock retrieves (if available) sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processSidecarsFromExecution(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) {
if block.Version() >= version.Fulu {
s.processDataColumnSidecarsFromExecution(ctx, block)
return
}
if block.Version() >= version.Deneb {
s.processBlobSidecarsFromExecution(ctx, block)
return
}
}
// processDataColumnSidecarsFromExecution retrieves (if available) data column sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, roSignedBlock interfaces.ReadOnlySignedBeaconBlock) {
block := roSignedBlock.Block()
log := log.WithFields(logrus.Fields{
"slot": block.Slot(),
"proposerIndex": block.ProposerIndex(),
})
kzgCommitments, err := block.Body().BlobKzgCommitments()
if err != nil {
log.WithError(err).Error("Failed to read commitments from block")
return
}
if len(kzgCommitments) == 0 {
// No blobs to reconstruct.
return
}
blockRoot, err := block.HashTreeRoot()
if err != nil {
log.WithError(err).Error("Failed to calculate block root")
return
}
log = log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot))
if s.cfg.dataColumnStorage == nil {
log.Warning("Data column storage is not enabled, skip saving data column, but continue to reconstruct and broadcast data column")
}
// When this function is called, it's from the time when the block is received, so in almost all situations we need to get the data column from EL instead of the blob storage.
sidecars, err := s.cfg.executionReconstructor.ReconstructDataColumnSidecars(ctx, roSignedBlock, blockRoot)
if err != nil {
log.WithError(err).Debug("Cannot reconstruct data column sidecars after receiving the block")
return
}
// Return early if no blobs are retrieved from the EL.
if len(sidecars) == 0 {
return
}
nodeID := s.cfg.p2p.NodeID()
custodyGroupCount, err := s.cfg.p2p.CustodyGroupCount()
if err != nil {
log.WithError(err).Error("Failed to get custody group count")
return
}
info, _, err := peerdas.Info(nodeID, custodyGroupCount)
if err != nil {
log.WithError(err).Error("Failed to get peer info")
return
}
blockSlot := block.Slot()
proposerIndex := block.ProposerIndex()
// Broadcast and save data column sidecars to custody but not yet received.
sidecarCount := uint64(len(sidecars))
for columnIndex := range info.CustodyColumns {
log := log.WithField("columnIndex", columnIndex)
if columnIndex >= sidecarCount {
log.Error("Column custody index out of range - should never happen")
continue
func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) {
if roBlock.Version() >= version.Fulu {
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromBlock(roBlock)); err != nil {
log.WithError(err).Error("Failed to process data column sidecars from execution")
return
}
if s.hasSeenDataColumnIndex(blockSlot, proposerIndex, columnIndex) {
continue
}
return
}
sidecar := sidecars[columnIndex]
if err := s.cfg.p2p.BroadcastDataColumnSidecar(blockRoot, sidecar.Index, sidecar.DataColumnSidecar); err != nil {
log.WithError(err).Error("Failed to broadcast data column")
}
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
log.WithError(err).Error("Failed to receive data column")
}
if roBlock.Version() >= version.Deneb {
s.processBlobSidecarsFromExecution(ctx, roBlock)
return
}
}
@@ -230,6 +157,166 @@ func (s *Service) processBlobSidecarsFromExecution(ctx context.Context, block in
}
}
// processDataColumnSidecarsFromExecution retrieves (if available) data column sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, source peerdas.ConstructionPopulator) error {
key := fmt.Sprintf("%#x", source.Root())
if _, err, _ := s.columnSidecarsExecSingleFlight.Do(key, func() (interface{}, error) {
const delay = 250 * time.Millisecond
secondsPerHalfSlot := time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second
numberOfColumns := params.BeaconConfig().NumberOfColumns
commitments, err := source.Commitments()
if err != nil {
return nil, errors.Wrap(err, "blob kzg commitments")
}
// Exit early if there are no commitments.
if len(commitments) == 0 {
return nil, nil
}
// Retrieve the indices of sidecars we should sample.
columnIndicesToSample, err := s.columnIndicesToSample()
if err != nil {
return nil, errors.Wrap(err, "column indices to sample")
}
ctx, cancel := context.WithTimeout(ctx, secondsPerHalfSlot)
defer cancel()
for iteration := uint64(0); ; /*no stop condition*/ iteration++ {
// Exit early if all sidecars to sample have been seen.
if s.haveAllSidecarsBeenSeen(source.Slot(), source.ProposerIndex(), columnIndicesToSample) {
return nil, nil
}
// Try to reconstruct data column constructedSidecars from the execution client.
constructedSidecars, err := s.cfg.executionReconstructor.ConstructDataColumnSidecars(ctx, source)
if err != nil {
return nil, errors.Wrap(err, "reconstruct data column sidecars")
}
// No sidecars are retrieved from the EL, retry later
sidecarCount := uint64(len(constructedSidecars))
if sidecarCount == 0 {
if ctx.Err() != nil {
return nil, ctx.Err()
}
time.Sleep(delay)
continue
}
// Boundary check.
if sidecarCount != numberOfColumns {
return nil, errors.Errorf("reconstruct data column sidecars returned %d sidecars, expected %d - should never happen", sidecarCount, numberOfColumns)
}
unseenIndices, err := s.broadcastAndReceiveUnseenDataColumnSidecars(ctx, source.Slot(), source.ProposerIndex(), columnIndicesToSample, constructedSidecars)
if err != nil {
return nil, errors.Wrap(err, "broadcast and receive unseen data column sidecars")
}
if len(unseenIndices) > 0 {
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", source.Root()),
"slot": source.Slot(),
"proposerIndex": source.ProposerIndex(),
"iteration": iteration,
"type": source.Type(),
"count": len(unseenIndices),
"indices": sortedSliceFromMap(unseenIndices),
}).Debug("Constructed data column sidecars from the execution client")
}
return nil, nil
}
}); err != nil {
return err
}
return nil
}
// broadcastAndReceiveUnseenDataColumnSidecars broadcasts and receives unseen data column sidecars.
func (s *Service) broadcastAndReceiveUnseenDataColumnSidecars(
ctx context.Context,
slot primitives.Slot,
proposerIndex primitives.ValidatorIndex,
neededIndices map[uint64]bool,
sidecars []blocks.VerifiedRODataColumn,
) (map[uint64]bool, error) {
// Compute sidecars we need to broadcast and receive.
unseenSidecars := make([]blocks.VerifiedRODataColumn, 0, len(sidecars))
unseenIndices := make(map[uint64]bool, len(sidecars))
for _, sidecar := range sidecars {
// Skip already seen data column sidecars.
if s.hasSeenDataColumnIndex(slot, proposerIndex, sidecar.Index) {
continue
}
if neededIndices[sidecar.Index] {
unseenSidecars = append(unseenSidecars, sidecar)
}
}
// Broadcast all the data column sidecars we reconstructed but did not see via gossip.
for _, sidecar := range unseenSidecars {
// Compute the subnet for this data column sidecar.
subnet := peerdas.ComputeSubnetForDataColumnSidecar(sidecar.Index)
// Broadcast the data column sidecar.
if err := s.cfg.p2p.BroadcastDataColumnSidecar(subnet, sidecar); err != nil {
// Don't return on error on broadcast failure, just log it.
log.WithError(err).Error("Broadcast data column")
}
}
// Receive data column sidecars.
if err := s.receiveDataColumnSidecars(ctx, unseenSidecars); err != nil {
return nil, errors.Wrap(err, "receive data column sidecars")
}
return unseenIndices, nil
}
// haveAllSidecarsBeenSeen checks if all sidecars for the given slot, proposer index, and data column indices have been seen.
func (s *Service) haveAllSidecarsBeenSeen(slot primitives.Slot, proposerIndex primitives.ValidatorIndex, indices map[uint64]bool) bool {
for index := range indices {
if !s.hasSeenDataColumnIndex(slot, proposerIndex, index) {
return false
}
}
return true
}
// columnIndicesToSample returns the data column indices we should sample for the node.
func (s *Service) columnIndicesToSample() (map[uint64]bool, error) {
// Retrieve our node ID.
nodeID := s.cfg.p2p.NodeID()
// Get the custody group sampling size for the node.
custodyGroupCount, err := s.cfg.p2p.CustodyGroupCount()
if err != nil {
return nil, errors.Wrap(err, "custody group count")
}
// Compute the sampling size.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#custody-sampling
samplesPerSlot := params.BeaconConfig().SamplesPerSlot
samplingSize := max(samplesPerSlot, custodyGroupCount)
// Get the peer info for the node.
peerInfo, _, err := peerdas.Info(nodeID, samplingSize)
if err != nil {
return nil, errors.Wrap(err, "peer info")
}
return peerInfo.CustodyColumns, nil
}
// WriteInvalidBlockToDisk as a block ssz. Writes to temp directory.
func saveInvalidBlockToTemp(block interfaces.ReadOnlySignedBeaconBlock) {
if !features.Get().SaveInvalidBlock {

View File

@@ -1,7 +1,6 @@
package sync
import (
"context"
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain"
@@ -135,7 +134,7 @@ func TestService_BeaconBlockSubscribe_UndefinedEeError(t *testing.T) {
require.Equal(t, 1, len(s.seenBlockCache.Keys()))
}
func TestReconstructAndBroadcastBlobs(t *testing.T) {
func TestProcessSidecarsFromExecutionFromBlock(t *testing.T) {
t.Run("blobs", func(t *testing.T) {
rob, err := blocks.NewROBlob(
&ethpb.BlobSidecar{
@@ -158,6 +157,9 @@ func TestReconstructAndBroadcastBlobs(t *testing.T) {
sb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
roBlock, err := blocks.NewROBlock(sb)
require.NoError(t, err)
tests := []struct {
name string
blobSidecars []blocks.VerifiedROBlob
@@ -192,7 +194,7 @@ func TestReconstructAndBroadcastBlobs(t *testing.T) {
},
seenBlobCache: lruwrpr.New(1),
}
s.processSidecarsFromExecution(context.Background(), sb)
s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
require.Equal(t, tt.expectedBlobCount, len(chainService.Blobs))
})
}
@@ -253,7 +255,7 @@ func TestReconstructAndBroadcastBlobs(t *testing.T) {
name: "Constructed 128 data columns with all blobs",
blobCount: 1,
dataColumnSidecars: allColumns,
expectedDataColumnCount: 4, // default is 4
expectedDataColumnCount: 8,
},
}
@@ -261,10 +263,10 @@ func TestReconstructAndBroadcastBlobs(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
s := Service{
cfg: &config{
p2p: mockp2p.NewTestP2P(t),
chain: chainService,
clock: startup.NewClock(time.Now(), [32]byte{}),
blobStorage: filesystem.NewEphemeralBlobStorage(t),
p2p: mockp2p.NewTestP2P(t),
chain: chainService,
clock: startup.NewClock(time.Now(), [32]byte{}),
dataColumnStorage: filesystem.NewEphemeralDataColumnStorage(t),
executionReconstructor: &mockExecution.EngineClient{
DataColumnSidecars: tt.dataColumnSidecars,
},
@@ -288,10 +290,91 @@ func TestReconstructAndBroadcastBlobs(t *testing.T) {
sb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
s.processSidecarsFromExecution(context.Background(), sb)
roBlock, err := blocks.NewROBlock(sb)
require.NoError(t, err)
s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
require.Equal(t, tt.expectedDataColumnCount, len(chainService.DataColumns))
})
}
})
}
func TestHaveAllSidecarsBeenSeen(t *testing.T) {
const (
slot = 42
proposerIndex = 1664
)
service := NewService(t.Context(), WithP2P(mockp2p.NewTestP2P(t)))
service.initCaches()
service.setSeenDataColumnIndex(slot, proposerIndex, 1)
service.setSeenDataColumnIndex(slot, proposerIndex, 3)
testCases := []struct {
name string
toSample map[uint64]bool
expected bool
}{
{
name: "all sidecars seen",
toSample: map[uint64]bool{1: true, 3: true},
expected: true,
},
{
name: "not all sidecars seen",
toSample: map[uint64]bool{1: true, 2: true, 3: true},
expected: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual := service.haveAllSidecarsBeenSeen(slot, proposerIndex, tc.toSample)
require.Equal(t, tc.expected, actual)
})
}
}
func TestColumnIndicesToSample(t *testing.T) {
const earliestAvailableSlot = 0
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.SamplesPerSlot = 4
params.OverrideBeaconConfig(cfg)
testCases := []struct {
name string
custodyGroupCount uint64
expected map[uint64]bool
}{
{
name: "custody group count lower than samples per slot",
custodyGroupCount: 3,
expected: map[uint64]bool{1: true, 17: true, 87: true, 102: true},
},
{
name: "custody group count higher than samples per slot",
custodyGroupCount: 5,
expected: map[uint64]bool{1: true, 17: true, 75: true, 87: true, 102: true},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
p2p := mockp2p.NewTestP2P(t)
_, _, err := p2p.UpdateCustodyInfo(earliestAvailableSlot, tc.custodyGroupCount)
require.NoError(t, err)
service := NewService(t.Context(), WithP2P(p2p))
actual, err := service.columnIndicesToSample()
require.NoError(t, err)
require.Equal(t, len(tc.expected), len(actual))
for index := range tc.expected {
require.Equal(t, true, actual[index])
}
})
}
}

View File

@@ -6,14 +6,19 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed"
opfeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)
// dataColumnSubscriber is the subscriber function for data column sidecars.
func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) error {
var wg errgroup.Group
sidecar, ok := msg.(blocks.VerifiedRODataColumn)
if !ok {
return fmt.Errorf("message was not type blocks.VerifiedRODataColumn, type=%T", msg)
@@ -23,34 +28,57 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
return errors.Wrap(err, "receive data column sidecar")
}
slot := sidecar.Slot()
proposerIndex := sidecar.ProposerIndex()
root := sidecar.BlockRoot()
wg.Go(func() error {
if err := s.processDataColumnSidecarsFromReconstruction(ctx, sidecar); err != nil {
return errors.Wrap(err, "process data column sidecars from reconstruction")
}
if err := s.reconstructSaveBroadcastDataColumnSidecars(ctx, slot, proposerIndex, root); err != nil {
return errors.Wrap(err, "reconstruct/save/broadcast data column sidecars")
return nil
})
wg.Go(func() error {
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromSidecar(sidecar)); err != nil {
return errors.Wrap(err, "process data column sidecars from execution")
}
return nil
})
if err := wg.Wait(); err != nil {
return err
}
return nil
}
// receiveDataColumnSidecar receives a single data column sidecar: marks it as seen and saves it to the chain.
// Do not loop over this function to receive multiple sidecars, use receiveDataColumnSidecars instead.
func (s *Service) receiveDataColumnSidecar(ctx context.Context, sidecar blocks.VerifiedRODataColumn) error {
slot := sidecar.SignedBlockHeader.Header.Slot
proposerIndex := sidecar.SignedBlockHeader.Header.ProposerIndex
columnIndex := sidecar.Index
return s.receiveDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{sidecar})
}
s.setSeenDataColumnIndex(slot, proposerIndex, columnIndex)
// receiveDataColumnSidecars receives multiple data column sidecars: marks them as seen and saves them to the chain.
func (s *Service) receiveDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn) error {
for _, sidecar := range sidecars {
slot := sidecar.SignedBlockHeader.Header.Slot
proposerIndex := sidecar.SignedBlockHeader.Header.ProposerIndex
columnIndex := sidecar.Index
if err := s.cfg.chain.ReceiveDataColumn(sidecar); err != nil {
s.setSeenDataColumnIndex(slot, proposerIndex, columnIndex)
}
if err := s.cfg.chain.ReceiveDataColumns(sidecars); err != nil {
return errors.Wrap(err, "receive data column")
}
s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{
Type: opfeed.DataColumnSidecarReceived,
Data: &opfeed.DataColumnSidecarReceivedData{
DataColumn: &sidecar,
},
})
for _, sidecar := range sidecars {
s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{
Type: opfeed.DataColumnSidecarReceived,
Data: &opfeed.DataColumnSidecarReceivedData{
DataColumn: &sidecar,
},
})
}
return nil
}

View File

@@ -30,17 +30,10 @@ func GenerateTestDataColumns(t *testing.T, parent [fieldparams.RootLength]byte,
}
cellsAndProofs := util.GenerateCellsAndProofs(t, blobs)
dataColumnSidecars, err := peerdas.DataColumnSidecars(roBlock, cellsAndProofs)
roDataColumnSidecars, err := peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
roDataColumns := make([]blocks.RODataColumn, 0, len(dataColumnSidecars))
for i := range dataColumnSidecars {
roDataColumn, err := blocks.NewRODataColumn(dataColumnSidecars[i])
require.NoError(t, err)
roDataColumns = append(roDataColumns, roDataColumn)
}
return roDataColumns
return roDataColumnSidecars
}
func TestColumnSatisfyRequirement(t *testing.T) {

View File

@@ -0,0 +1,3 @@
### Added
- Add retry logic when GetBlobsV2 is called.
- Call GetBlobsV2 as soon as we receive the first data column sidecar or block

View File

@@ -283,6 +283,7 @@ exceptions:
- compute_fork_digest#fulu
- compute_matrix#fulu
- get_blob_parameters#fulu
- get_data_column_sidecars_from_block#fulu
- get_data_column_sidecars_from_column_sidecar#fulu
- get_extended_sample_count#fulu
- recover_matrix#fulu

View File

@@ -2076,7 +2076,7 @@
- name: get_data_column_sidecars
sources:
- file: beacon-chain/core/peerdas/das_core.go
- file: beacon-chain/core/peerdas/validator.go
search: func DataColumnSidecars(
spec: |
<spec fn="get_data_column_sidecars" fork="fulu" hash="317fc596">
@@ -2113,60 +2113,6 @@
return sidecars
</spec>
- name: get_data_column_sidecars_from_block
sources:
- file: beacon-chain/core/peerdas/das_core.go
search: func dataColumnsSidecars(
spec: |
<spec fn="get_data_column_sidecars_from_block" fork="fulu" hash="02ffae23">
def get_data_column_sidecars_from_block(
signed_block: SignedBeaconBlock,
cells_and_kzg_proofs: Sequence[
Tuple[Vector[Cell, CELLS_PER_EXT_BLOB], Vector[KZGProof, CELLS_PER_EXT_BLOB]]
],
) -> Sequence[DataColumnSidecar]:
"""
Given a signed block and the cells/proofs associated with each blob in the
block, assemble the sidecars which can be distributed to peers.
"""
blob_kzg_commitments = signed_block.message.body.blob_kzg_commitments
signed_block_header = compute_signed_block_header(signed_block)
kzg_commitments_inclusion_proof = compute_merkle_proof(
signed_block.message.body,
get_generalized_index(BeaconBlockBody, "blob_kzg_commitments"),
)
return get_data_column_sidecars(
signed_block_header,
blob_kzg_commitments,
kzg_commitments_inclusion_proof,
cells_and_kzg_proofs,
)
</spec>
- name: get_data_column_sidecars_from_column_sidecar
sources: []
spec: |
<spec fn="get_data_column_sidecars_from_column_sidecar" fork="fulu" hash="4304cdec">
def get_data_column_sidecars_from_column_sidecar(
sidecar: DataColumnSidecar,
cells_and_kzg_proofs: Sequence[
Tuple[Vector[Cell, CELLS_PER_EXT_BLOB], Vector[KZGProof, CELLS_PER_EXT_BLOB]]
],
) -> Sequence[DataColumnSidecar]:
"""
Given a DataColumnSidecar and the cells/proofs associated with each blob corresponding
to the commitments it contains, assemble all sidecars for distribution to peers.
"""
assert len(cells_and_kzg_proofs) == len(sidecar.kzg_commitments)
return get_data_column_sidecars(
sidecar.signed_block_header,
sidecar.kzg_commitments,
sidecar.kzg_commitments_inclusion_proof,
cells_and_kzg_proofs,
)
</spec>
- name: get_domain
sources:
- file: beacon-chain/core/signing/domain.go

View File

@@ -129,17 +129,6 @@ func GenerateTestFuluBlockWithSidecars(t *testing.T, blobCount int, options ...F
block.Block.Body.BlobKzgCommitments = commitments
body, err := blocks.NewBeaconBlockBody(block.Block.Body)
require.NoError(t, err)
inclusion := make([][][]byte, blobCount)
for i := range blobCount {
proof, err := blocks.MerkleProofKZGCommitment(body, i)
require.NoError(t, err)
inclusion[i] = proof
}
if generator.sign {
epoch := slots.ToEpoch(block.Block.Slot)
fork := params.ForkFromConfig(params.BeaconConfig(), epoch)
@@ -159,15 +148,13 @@ func GenerateTestFuluBlockWithSidecars(t *testing.T, blobCount int, options ...F
cellsAndProofs := GenerateCellsAndProofs(t, blobs)
sidecars, err := peerdas.DataColumnSidecars(signedBeaconBlock, cellsAndProofs)
rob, err := blocks.NewROBlockWithRoot(signedBeaconBlock, root)
require.NoError(t, err)
roSidecars, err := peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(rob))
require.NoError(t, err)
roSidecars := make([]blocks.RODataColumn, 0, len(sidecars))
verifiedRoSidecars := make([]blocks.VerifiedRODataColumn, 0, len(sidecars))
for _, sidecar := range sidecars {
roSidecar, err := blocks.NewRODataColumnWithRoot(sidecar, root)
require.NoError(t, err)
verifiedRoSidecars := make([]blocks.VerifiedRODataColumn, 0, len(roSidecars))
for _, roSidecar := range roSidecars {
roVerifiedSidecar := blocks.NewVerifiedRODataColumn(roSidecar)
roSidecars = append(roSidecars, roSidecar)