beaconBlockSubscriber: Implement data column sidecars reconstruction with data retrieved from the execution client when receiving a block via gossip. (#15483)

This commit is contained in:
Manu NALEPA
2025-07-16 08:00:23 +02:00
committed by GitHub
parent 3def16caaa
commit d8fedacc26
4 changed files with 263 additions and 65 deletions

View File

@@ -682,7 +682,7 @@ func (s *Service) ReconstructDataColumnSidecars(ctx context.Context, signedROBlo
// Return early if nothing is returned from the EL.
if len(blobAndProofV2s) == 0 {
log.Debug("No blobs returned from EL")
log.Debug("No blobs returned from execution client")
return nil, nil
}
@@ -715,7 +715,7 @@ func (s *Service) ReconstructDataColumnSidecars(ctx context.Context, signedROBlo
verifiedRODataColumns = append(verifiedRODataColumns, verifiedRODataColumn)
}
log.Debug("Data columns successfully reconstructed from the execution client.")
log.Debug("Data columns successfully reconstructed from the execution client")
return verifiedRODataColumns, nil
}

View File

@@ -7,6 +7,7 @@ import (
"path"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/transition/interop"
"github.com/OffchainLabs/prysm/v6/config/features"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
@@ -14,6 +15,7 @@ import (
"github.com/OffchainLabs/prysm/v6/io/file"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
@@ -35,7 +37,7 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
return err
}
go s.reconstructAndBroadcastBlobs(ctx, signed)
go s.processSidecarsFromExecution(ctx, signed)
if err := s.cfg.chain.ReceiveBlock(ctx, signed, root, nil); err != nil {
if blockchain.IsInvalidBlock(err) {
@@ -59,14 +61,108 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
return err
}
// reconstructAndBroadcastBlobs processes and broadcasts blob sidecars for a given beacon block.
// This function reconstructs the blob sidecars from the EL using the block's KZG commitments,
// broadcasts the reconstructed blobs over P2P, and saves them into the blob storage.
func (s *Service) reconstructAndBroadcastBlobs(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) {
if block.Version() < version.Deneb {
// processSidecarsFromExecution retrieves (if available) sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processSidecarsFromExecution(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) {
if block.Version() >= version.Fulu {
s.processDataColumnSidecarsFromExecution(ctx, block)
return
}
if block.Version() >= version.Deneb {
s.processBlobSidecarsFromExecution(ctx, block)
return
}
}
// processDataColumnSidecarsFromExecution retrieves (if available) data column sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, roSignedBlock interfaces.ReadOnlySignedBeaconBlock) {
block := roSignedBlock.Block()
log := log.WithFields(logrus.Fields{
"slot": block.Slot(),
"proposerIndex": block.ProposerIndex(),
})
kzgCommitments, err := block.Body().BlobKzgCommitments()
if err != nil {
log.WithError(err).Error("Failed to read commitments from block")
return
}
if len(kzgCommitments) == 0 {
// No blobs to reconstruct.
return
}
blockRoot, err := block.HashTreeRoot()
if err != nil {
log.WithError(err).Error("Failed to calculate block root")
return
}
log = log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot))
if s.cfg.dataColumnStorage == nil {
log.Warning("Data column storage is not enabled, skip saving data column, but continue to reconstruct and broadcast data column")
}
// When this function is called, it's from the time when the block is received, so in almost all situations we need to get the data column from EL instead of the blob storage.
sidecars, err := s.cfg.executionReconstructor.ReconstructDataColumnSidecars(ctx, roSignedBlock, blockRoot)
if err != nil {
log.WithError(err).Debug("Cannot reconstruct data column sidecars after receiving the block")
return
}
// Return early if no blobs are retrieved from the EL.
if len(sidecars) == 0 {
return
}
nodeID := s.cfg.p2p.NodeID()
s.cfg.custodyInfo.Mut.RLock()
defer s.cfg.custodyInfo.Mut.RUnlock()
groupCount := s.cfg.custodyInfo.ActualGroupCount()
info, _, err := peerdas.Info(nodeID, groupCount)
if err != nil {
log.WithError(err).Error("Failed to get peer info")
return
}
blockSlot := block.Slot()
proposerIndex := block.ProposerIndex()
// Broadcast and save data columns sidecars to custody but not yet received.
sidecarCount := uint64(len(sidecars))
for columnIndex := range info.CustodyColumns {
log := log.WithField("columnIndex", columnIndex)
if columnIndex >= sidecarCount {
log.Error("Column custody index out of range - should never happen")
continue
}
if s.hasSeenDataColumnIndex(blockSlot, proposerIndex, columnIndex) {
continue
}
sidecar := sidecars[columnIndex]
if err := s.cfg.p2p.BroadcastDataColumn(blockRoot, sidecar.Index, sidecar.DataColumnSidecar); err != nil {
log.WithError(err).Error("Failed to broadcast data column")
}
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
log.WithError(err).Error("Failed to receive data column")
}
}
}
// processBlobSidecarsFromExecution retrieves (if available) blob sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processBlobSidecarsFromExecution(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) {
startTime, err := slots.StartTime(s.cfg.chain.GenesisTime(), block.Block().Slot())
if err != nil {
log.WithError(err).Error("Failed to convert slot to time")

View File

@@ -1,10 +1,13 @@
package sync
import (
"context"
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
chainMock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
dbtest "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/execution"
@@ -13,7 +16,9 @@ import (
mockp2p "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
lruwrpr "github.com/OffchainLabs/prysm/v6/cache/lru"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require"
@@ -132,63 +137,158 @@ func TestService_BeaconBlockSubscribe_UndefinedEeError(t *testing.T) {
}
func TestReconstructAndBroadcastBlobs(t *testing.T) {
rob, err := blocks.NewROBlob(
&ethpb.BlobSidecar{
SignedBlockHeader: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
},
Signature: []byte("signature"),
},
})
require.NoError(t, err)
chainService := &chainMock.ChainService{
Genesis: time.Now(),
}
b := util.NewBeaconBlockDeneb()
sb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
tests := []struct {
name string
blobSidecars []blocks.VerifiedROBlob
expectedBlobCount int
}{
{
name: "Constructed 0 blobs",
blobSidecars: nil,
expectedBlobCount: 0,
},
{
name: "Constructed 6 blobs",
blobSidecars: []blocks.VerifiedROBlob{
{ROBlob: rob}, {ROBlob: rob}, {ROBlob: rob}, {ROBlob: rob}, {ROBlob: rob}, {ROBlob: rob},
},
expectedBlobCount: 6,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := Service{
cfg: &config{
p2p: mockp2p.NewTestP2P(t),
chain: chainService,
clock: startup.NewClock(time.Now(), [32]byte{}),
blobStorage: filesystem.NewEphemeralBlobStorage(t),
executionReconstructor: &mockExecution.EngineClient{
BlobSidecars: tt.blobSidecars,
t.Run("blobs", func(t *testing.T) {
rob, err := blocks.NewROBlob(
&ethpb.BlobSidecar{
SignedBlockHeader: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
},
operationNotifier: &chainMock.MockOperationNotifier{},
Signature: []byte("signature"),
},
seenBlobCache: lruwrpr.New(1),
}
s.reconstructAndBroadcastBlobs(t.Context(), sb)
require.Equal(t, tt.expectedBlobCount, len(chainService.Blobs))
})
}
})
require.NoError(t, err)
chainService := &chainMock.ChainService{
Genesis: time.Now(),
}
b := util.NewBeaconBlockDeneb()
sb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
tests := []struct {
name string
blobSidecars []blocks.VerifiedROBlob
expectedBlobCount int
}{
{
name: "Constructed 0 blobs",
blobSidecars: nil,
expectedBlobCount: 0,
},
{
name: "Constructed 6 blobs",
blobSidecars: []blocks.VerifiedROBlob{
{ROBlob: rob}, {ROBlob: rob}, {ROBlob: rob}, {ROBlob: rob}, {ROBlob: rob}, {ROBlob: rob},
},
expectedBlobCount: 6,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := Service{
cfg: &config{
p2p: mockp2p.NewTestP2P(t),
chain: chainService,
clock: startup.NewClock(time.Now(), [32]byte{}),
blobStorage: filesystem.NewEphemeralBlobStorage(t),
executionReconstructor: &mockExecution.EngineClient{
BlobSidecars: tt.blobSidecars,
},
operationNotifier: &chainMock.MockOperationNotifier{},
},
seenBlobCache: lruwrpr.New(1),
}
s.processSidecarsFromExecution(context.Background(), sb)
require.Equal(t, tt.expectedBlobCount, len(chainService.Blobs))
})
}
})
t.Run("data columns", func(t *testing.T) {
// load trusted setup
err := kzg.Start()
require.NoError(t, err)
// Setup right fork epoch
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig().Copy()
cfg.CapellaForkEpoch = 0
cfg.DenebForkEpoch = 0
cfg.ElectraForkEpoch = 0
cfg.FuluForkEpoch = 0
params.OverrideBeaconConfig(cfg)
chainService := &chainMock.ChainService{
Genesis: time.Now(),
}
allColumns := make([]blocks.VerifiedRODataColumn, 128)
for i := range allColumns {
rod, err := blocks.NewRODataColumn(
&ethpb.DataColumnSidecar{
SignedBlockHeader: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
ProposerIndex: primitives.ValidatorIndex(123),
Slot: primitives.Slot(123),
},
Signature: []byte("signature"),
},
Index: uint64(i),
})
require.NoError(t, err)
allColumns[i] = blocks.VerifiedRODataColumn{RODataColumn: rod}
}
tests := []struct {
name string
dataColumnSidecars []blocks.VerifiedRODataColumn
blobCount int
expectedDataColumnCount int
}{
{
name: "Constructed 0 data columns with no blobs",
blobCount: 0,
dataColumnSidecars: nil,
expectedDataColumnCount: 0,
},
{
name: "Constructed 128 data columns with all blobs",
blobCount: 1,
dataColumnSidecars: allColumns,
expectedDataColumnCount: 4, // default is 4
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := Service{
cfg: &config{
p2p: mockp2p.NewTestP2P(t),
chain: chainService,
clock: startup.NewClock(time.Now(), [32]byte{}),
blobStorage: filesystem.NewEphemeralBlobStorage(t),
executionReconstructor: &mockExecution.EngineClient{
DataColumnSidecars: tt.dataColumnSidecars,
},
operationNotifier: &chainMock.MockOperationNotifier{},
custodyInfo: &peerdas.CustodyInfo{},
},
seenDataColumnCache: newSlotAwareCache(1),
}
kzgCommitments := make([][]byte, 0, tt.blobCount)
for range tt.blobCount {
kzgCommitment := make([]byte, 48)
kzgCommitments = append(kzgCommitments, kzgCommitment)
}
b := util.NewBeaconBlockFulu()
b.Block.Body.BlobKzgCommitments = kzgCommitments
sb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
s.processSidecarsFromExecution(context.Background(), sb)
require.Equal(t, tt.expectedDataColumnCount, len(chainService.DataColumns))
})
}
})
}

View File

@@ -0,0 +1,2 @@
### Added
- Implement data column sidecars reconstruction with data retrieved from the execution client when receiving a block via gossip.