PeerDAS: Implement reconstruct. (#15454)

* PeerDAS: Implement reconstruct.

* Fix Preston's comment.

* Fix Preston's comment.
This commit is contained in:
Manu NALEPA
2025-07-02 21:02:55 +02:00
committed by GitHub
parent 74c9586c66
commit b1ac8209b2
12 changed files with 550 additions and 14 deletions

View File

@@ -38,11 +38,14 @@ const (
// SingleAttReceived is sent after a single attestation object is received from gossip or rpc
SingleAttReceived = 9
// DataColumnSidecarReceived is sent after a data column sidecar is received from gossip or rpc.
DataColumnSidecarReceived = 10
// BlockGossipReceived is sent after a block has been received from gossip or API that passes validation rules.
BlockGossipReceived = 10
BlockGossipReceived = 11
// DataColumnReceived is sent after a data column has been seen after gossip validation rules.
DataColumnReceived = 11
DataColumnReceived = 12
)
// UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events.
@@ -94,6 +97,11 @@ type SingleAttReceivedData struct {
Attestation ethpb.Att
}
// DataColumnSidecarReceivedData is the data sent with DataColumnSidecarReceived events.
type DataColumnSidecarReceivedData struct {
DataColumn *blocks.VerifiedRODataColumn
}
// BlockGossipReceivedData is the data sent with BlockGossipReceived events.
type BlockGossipReceivedData struct {
// SignedBlock is the block that was received.

View File

@@ -886,6 +886,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil
regularsync.WithDataColumnStorage(b.DataColumnStorage),
regularsync.WithVerifierWaiter(b.verifyInitWaiter),
regularsync.WithAvailableBlocker(bFillStore),
regularsync.WithCustodyInfo(b.custodyInfo),
regularsync.WithSlasherEnabled(b.slasherEnabled),
regularsync.WithLightClientStore(b.lcStore),
)

View File

@@ -7,6 +7,7 @@ go_library(
"block_batcher.go",
"broadcast_bls_changes.go",
"context.go",
"data_columns_reconstruct.go",
"deadlines.go",
"decode_pubsub.go",
"doc.go",
@@ -40,6 +41,7 @@ go_library(
"subscriber_beacon_blocks.go",
"subscriber_blob_sidecar.go",
"subscriber_bls_to_execution_change.go",
"subscriber_data_column_sidecar.go",
"subscriber_handlers.go",
"subscriber_light_client.go",
"subscriber_sync_committee_message.go",
@@ -78,6 +80,7 @@ go_library(
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/light-client:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/core/transition/interop:go_default_library",
@@ -162,6 +165,7 @@ go_test(
"block_batcher_test.go",
"broadcast_bls_changes_test.go",
"context_test.go",
"data_columns_reconstruct_test.go",
"decode_pubsub_test.go",
"error_test.go",
"fork_watcher_test.go",
@@ -207,6 +211,7 @@ go_test(
deps = [
"//async/abool:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/altair:go_default_library",
@@ -214,6 +219,7 @@ go_test(
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/light-client:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",

View File

@@ -0,0 +1,208 @@
package sync
import (
"context"
"fmt"
"slices"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
broadcastMissingDataColumnsTimeIntoSlotMin = 1 * time.Second
broadcastMissingDataColumnsSlack = 2 * time.Second
)
// reconstructSaveBroadcastDataColumnSidecars reconstructs if possible and
// needed all data column sidecars. Then, it saves into the store missing
// sidecars. After a delay, it broadcasts in the background not seen via gossip
// (but reconstructed) sidecars.
func (s *Service) reconstructSaveBroadcastDataColumnSidecars(
ctx context.Context,
slot primitives.Slot,
proposerIndex primitives.ValidatorIndex,
root [fieldparams.RootLength]byte,
) error {
startTime := time.Now()
// Get the columns we store.
storedDataColumns := s.cfg.dataColumnStorage.Summary(root)
storedColumnsCount := storedDataColumns.Count()
numberOfColumns := params.BeaconConfig().NumberOfColumns
// Lock to prevent concurrent reconstructions.
s.reconstructionLock.Lock()
defer s.reconstructionLock.Unlock()
// If reconstruction is not possible or if all columns are already stored, exit early.
if storedColumnsCount < peerdas.MinimumColumnsCountToReconstruct() || storedColumnsCount == numberOfColumns {
return nil
}
// Retrieve our local node info.
nodeID := s.cfg.p2p.NodeID()
custodyGroupCount := s.cfg.custodyInfo.ActualGroupCount()
localNodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)
if err != nil {
return errors.Wrap(err, "peer info")
}
// Load all the possible data columns sidecars, to minimize reconstruction time.
verifiedSidecars, err := s.cfg.dataColumnStorage.Get(root, nil)
if err != nil {
return errors.Wrap(err, "get data column sidecars")
}
// Reconstruct all the data column sidecars.
reconstructedSidecars, err := peerdas.ReconstructDataColumnSidecars(verifiedSidecars)
if err != nil {
return errors.Wrap(err, "reconstruct data column sidecars")
}
// Filter reconstructed sidecars to save.
custodyColumns := localNodeInfo.CustodyColumns
toSaveSidecars := make([]blocks.VerifiedRODataColumn, 0, len(custodyColumns))
for _, sidecar := range reconstructedSidecars {
if custodyColumns[sidecar.Index] {
toSaveSidecars = append(toSaveSidecars, sidecar)
}
}
// Save the data columns sidecars in the database.
// Note: We do not call `receiveDataColumn`, because it will ignore
// incoming data columns via gossip while we did not broadcast (yet) the reconstructed data columns.
if err := s.cfg.dataColumnStorage.Save(toSaveSidecars); err != nil {
return errors.Wrap(err, "save data column sidecars")
}
// Update reconstruction metrics
dataColumnReconstructionHistogram.Observe(float64(time.Since(startTime).Milliseconds()))
dataColumnReconstructionCounter.Add(float64(len(reconstructedSidecars) - len(verifiedSidecars)))
// Schedule the broadcast.
if err := s.scheduleMissingDataColumnSidecarsBroadcast(ctx, root, proposerIndex, slot); err != nil {
return errors.Wrap(err, "schedule reconstructed data columns broadcast")
}
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"slot": slot,
"fromColumnsCount": storedColumnsCount,
}).Debug("Data columns reconstructed and saved")
return nil
}
// scheduleMissingDataColumnSidecarsBroadcast schedules the broadcast of missing
// (aka. not seen via gossip but reconstructed) sidecars.
func (s *Service) scheduleMissingDataColumnSidecarsBroadcast(
ctx context.Context,
root [fieldparams.RootLength]byte,
proposerIndex primitives.ValidatorIndex,
slot primitives.Slot,
) error {
log := log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%x", root),
"slot": slot,
})
// Get the time corresponding to the start of the slot.
genesisTime := uint64(s.cfg.chain.GenesisTime().Unix())
slotStartTime, err := slots.ToTime(genesisTime, slot)
if err != nil {
return errors.Wrap(err, "to time")
}
// Compute the waiting time. This could be negative. In such a case, broadcast immediately.
randFloat := s.reconstructionRandGen.Float64()
timeIntoSlot := broadcastMissingDataColumnsTimeIntoSlotMin + time.Duration(float64(broadcastMissingDataColumnsSlack)*randFloat)
broadcastTime := slotStartTime.Add(timeIntoSlot)
waitingTime := time.Until(broadcastTime)
time.AfterFunc(waitingTime, func() {
// Return early if the context was canceled during the waiting time.
if err := ctx.Err(); err != nil {
return
}
if err := s.broadcastMissingDataColumnSidecars(slot, proposerIndex, root, timeIntoSlot); err != nil {
log.WithError(err).Error("Failed to broadcast missing data column sidecars")
}
})
return nil
}
func (s *Service) broadcastMissingDataColumnSidecars(
slot primitives.Slot,
proposerIndex primitives.ValidatorIndex,
root [fieldparams.RootLength]byte,
timeIntoSlot time.Duration,
) error {
// Get the node ID.
nodeID := s.cfg.p2p.NodeID()
// Get the custody group count.
custodyGroupCount := s.cfg.custodyInfo.ActualGroupCount()
// Retrieve the local node info.
localNodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)
if err != nil {
return errors.Wrap(err, "peerdas info")
}
// Compute the missing data columns (data columns we should custody but we did not received via gossip.)
missingColumns := make([]uint64, 0, len(localNodeInfo.CustodyColumns))
for column := range localNodeInfo.CustodyColumns {
if !s.hasSeenDataColumnIndex(slot, proposerIndex, column) {
missingColumns = append(missingColumns, column)
}
}
// Return early if there are no missing data columns.
if len(missingColumns) == 0 {
return nil
}
// Load from the store the non received but reconstructed data column.
verifiedRODataColumnSidecars, err := s.cfg.dataColumnStorage.Get(root, missingColumns)
if err != nil {
return errors.Wrap(err, "data column storage get")
}
broadcastedColumns := make([]uint64, 0, len(verifiedRODataColumnSidecars))
for _, verifiedRODataColumn := range verifiedRODataColumnSidecars {
broadcastedColumns = append(broadcastedColumns, verifiedRODataColumn.Index)
// Compute the subnet for this column.
subnet := peerdas.ComputeSubnetForDataColumnSidecar(verifiedRODataColumn.Index)
// Broadcast the missing data column.
if err := s.cfg.p2p.BroadcastDataColumn(root, subnet, verifiedRODataColumn.DataColumnSidecar); err != nil {
log.WithError(err).Error("Broadcast data column")
}
// Now, we can set the data column as seen.
s.setSeenDataColumnIndex(slot, proposerIndex, verifiedRODataColumn.Index)
}
if logrus.GetLevel() >= logrus.DebugLevel {
// Sort for nice logging.
slices.Sort(broadcastedColumns)
slices.Sort(missingColumns)
log.WithFields(logrus.Fields{
"timeIntoSlot": timeIntoSlot,
"missingColumns": missingColumns,
"broadcasted": broadcastedColumns,
}).Debug("Start broadcasting not seen via gossip but reconstructed data columns")
}
return nil
}

View File

@@ -0,0 +1,191 @@
package sync
import (
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
)
func TestReconstructDataColumns(t *testing.T) {
const blobCount = 4
numberOfColumns := params.BeaconConfig().NumberOfColumns
ctx := t.Context()
// Start the trusted setup.
err := kzg.Start()
require.NoError(t, err)
roBlock, _, verifiedRoDataColumns := util.GenerateTestFuluBlockWithSidecars(t, blobCount)
require.Equal(t, numberOfColumns, uint64(len(verifiedRoDataColumns)))
root, block := roBlock.Root(), roBlock.Block()
slot, proposerIndex := block.Slot(), block.ProposerIndex()
minimumCount := peerdas.MinimumColumnsCountToReconstruct()
t.Run("not enough stored sidecars", func(t *testing.T) {
storage := filesystem.NewEphemeralDataColumnStorage(t)
err := storage.Save(verifiedRoDataColumns[:minimumCount-1])
require.NoError(t, err)
service := NewService(ctx, WithP2P(p2ptest.NewTestP2P(t)), WithDataColumnStorage(storage))
err = service.reconstructSaveBroadcastDataColumnSidecars(ctx, slot, proposerIndex, root)
require.NoError(t, err)
})
t.Run("all stored sidecars", func(t *testing.T) {
storage := filesystem.NewEphemeralDataColumnStorage(t)
err := storage.Save(verifiedRoDataColumns)
require.NoError(t, err)
service := NewService(ctx, WithP2P(p2ptest.NewTestP2P(t)), WithDataColumnStorage(storage))
err = service.reconstructSaveBroadcastDataColumnSidecars(ctx, slot, proposerIndex, root)
require.NoError(t, err)
})
t.Run("should reconstruct", func(t *testing.T) {
// Here we setup a cgc of 8, which is not realistic, since there is no
// real reason for a node to both:
// - store enough data column sidecars to enable reconstruction, and
// - custody not enough columns to enable reconstruction.
// However, for the needs of this test, this is perfectly fine.
const cgc = 8
storage := filesystem.NewEphemeralDataColumnStorage(t)
minimumCount := peerdas.MinimumColumnsCountToReconstruct()
err := storage.Save(verifiedRoDataColumns[:minimumCount])
require.NoError(t, err)
custodyInfo := &peerdas.CustodyInfo{}
custodyInfo.TargetGroupCount.SetValidatorsCustodyRequirement(cgc)
custodyInfo.ToAdvertiseGroupCount.Set(cgc)
service := NewService(
ctx,
WithP2P(p2ptest.NewTestP2P(t)),
WithDataColumnStorage(storage),
WithCustodyInfo(custodyInfo),
WithChainService(&mockChain.ChainService{}),
)
err = service.reconstructSaveBroadcastDataColumnSidecars(ctx, slot, proposerIndex, root)
require.NoError(t, err)
expected := make(map[uint64]bool, minimumCount+cgc)
for i := range minimumCount {
expected[i] = true
}
// The node should custody these indices.
for _, i := range [...]uint64{1, 17, 19, 42, 75, 87, 102, 117} {
expected[i] = true
}
summary := storage.Summary(root)
actual := summary.Stored()
require.Equal(t, len(expected), len(actual))
for index := range expected {
require.Equal(t, true, actual[index])
}
})
}
func TestBroadcastMissingDataColumnSidecars(t *testing.T) {
const (
cgc = 8
blobCount = 4
timeIntoSlot = 0
)
numberOfColumns := params.BeaconConfig().NumberOfColumns
ctx := t.Context()
// Start the trusted setup.
err := kzg.Start()
require.NoError(t, err)
roBlock, _, verifiedRoDataColumns := util.GenerateTestFuluBlockWithSidecars(t, blobCount)
require.Equal(t, numberOfColumns, uint64(len(verifiedRoDataColumns)))
root, block := roBlock.Root(), roBlock.Block()
slot, proposerIndex := block.Slot(), block.ProposerIndex()
t.Run("no missing sidecars", func(t *testing.T) {
custodyInfo := &peerdas.CustodyInfo{}
custodyInfo.TargetGroupCount.SetValidatorsCustodyRequirement(cgc)
custodyInfo.ToAdvertiseGroupCount.Set(cgc)
service := NewService(
ctx,
WithP2P(p2ptest.NewTestP2P(t)),
WithCustodyInfo(custodyInfo),
)
for _, index := range [...]uint64{1, 17, 19, 42, 75, 87, 102, 117} {
key := computeCacheKey(slot, proposerIndex, index)
service.seenDataColumnCache.Add(key, true)
}
err := service.broadcastMissingDataColumnSidecars(slot, proposerIndex, root, timeIntoSlot)
require.NoError(t, err)
})
t.Run("some missing sidecars", func(t *testing.T) {
custodyInfo := &peerdas.CustodyInfo{}
custodyInfo.TargetGroupCount.SetValidatorsCustodyRequirement(cgc)
custodyInfo.ToAdvertiseGroupCount.Set(cgc)
toSave := make([]blocks.VerifiedRODataColumn, 0, 2)
for _, index := range [...]uint64{42, 87} {
toSave = append(toSave, verifiedRoDataColumns[index])
}
p2p := p2ptest.NewTestP2P(t)
storage := filesystem.NewEphemeralDataColumnStorage(t)
err := storage.Save(toSave)
require.NoError(t, err)
service := NewService(
ctx,
WithP2P(p2p),
WithCustodyInfo(custodyInfo),
WithDataColumnStorage(storage),
)
for _, index := range [...]uint64{1, 17, 19, 102, 117} { // 42, 75 and 87 are missing
key := computeCacheKey(slot, proposerIndex, index)
service.seenDataColumnCache.Add(key, true)
}
for _, index := range [...]uint64{42, 75, 87} {
seen := service.hasSeenDataColumnIndex(slot, proposerIndex, index)
require.Equal(t, false, seen)
}
require.Equal(t, false, p2p.BroadcastCalled.Load())
err = service.broadcastMissingDataColumnSidecars(slot, proposerIndex, root, timeIntoSlot)
require.NoError(t, err)
seen := service.hasSeenDataColumnIndex(slot, proposerIndex, 75)
require.Equal(t, false, seen)
for _, index := range [...]uint64{42, 87} {
seen := service.hasSeenDataColumnIndex(slot, proposerIndex, index)
require.Equal(t, true, seen)
}
require.Equal(t, true, p2p.BroadcastCalled.Load())
})
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/OffchainLabs/prysm/v6/async/abool"
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
@@ -46,6 +47,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
chain: chainService,
clock: startup.NewClock(gt, vr),
initialSync: &mockSync.Sync{IsSyncing: false},
custodyInfo: &peerdas.CustodyInfo{},
},
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
@@ -81,6 +83,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
chain: chainService,
clock: startup.NewClock(gt, vr),
initialSync: &mockSync.Sync{IsSyncing: false},
custodyInfo: &peerdas.CustodyInfo{},
},
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
@@ -125,6 +128,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
initialSync: &mockSync.Sync{IsSyncing: false},
custodyInfo: &peerdas.CustodyInfo{},
},
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
@@ -167,6 +171,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
chain: chainService,
clock: startup.NewClock(gt, vr),
initialSync: &mockSync.Sync{IsSyncing: false},
custodyInfo: &peerdas.CustodyInfo{},
},
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
@@ -211,6 +216,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
chain: chainService,
clock: startup.NewClock(gt, vr),
initialSync: &mockSync.Sync{IsSyncing: false},
custodyInfo: &peerdas.CustodyInfo{},
},
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
@@ -255,6 +261,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
chain: chainService,
clock: startup.NewClock(gt, vr),
initialSync: &mockSync.Sync{IsSyncing: false},
custodyInfo: &peerdas.CustodyInfo{},
},
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
@@ -274,6 +281,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
}
assert.Equal(t, true, rpcMap[p2p.RPCBlobSidecarsByRangeTopicV1+s.cfg.p2p.Encoding().ProtocolSuffix()], "topic doesn't exist")
assert.Equal(t, true, rpcMap[p2p.RPCBlobSidecarsByRootTopicV1+s.cfg.p2p.Encoding().ProtocolSuffix()], "topic doesn't exist")
assert.Equal(t, true, rpcMap[p2p.RPCMetaDataTopicV3+s.cfg.p2p.Encoding().ProtocolSuffix()], "topic doesn't exist")
},
},
}

View File

@@ -210,6 +210,19 @@ var (
Buckets: []float64{100, 250, 500, 750, 1000, 1500, 2000, 4000, 8000, 12000, 16000},
},
)
dataColumnReconstructionCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "beacon_data_availability_reconstructed_columns_total",
Help: "Count the number of reconstructed data columns.",
})
dataColumnReconstructionHistogram = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "beacon_data_availability_reconstruction_time_milliseconds",
Help: "Captures the time taken to reconstruct data columns.",
Buckets: []float64{100, 250, 500, 750, 1000, 1500, 2000, 4000, 8000, 12000, 16000},
},
)
)
func (s *Service) updateMetrics() {

View File

@@ -7,6 +7,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation"
statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state"
lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
"github.com/OffchainLabs/prysm/v6/beacon-chain/execution"
@@ -198,6 +199,14 @@ func WithAvailableBlocker(avb coverage.AvailableBlocker) Option {
}
}
// WithCustodyInfo for custody info.
func WithCustodyInfo(custodyInfo *peerdas.CustodyInfo) Option {
return func(s *Service) error {
s.cfg.custodyInfo = custodyInfo
return nil
}
}
// WithSlasherEnabled configures the sync package to support slashing detection.
func WithSlasherEnabled(enabled bool) Option {
return func(s *Service) error {

View File

@@ -10,6 +10,8 @@ import (
"time"
lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/crypto/rand"
lru "github.com/hashicorp/golang-lru"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2pcore "github.com/libp2p/go-libp2p/core"
@@ -103,12 +105,14 @@ type config struct {
stateNotifier statefeed.Notifier
blobStorage *filesystem.BlobStorage
dataColumnStorage *filesystem.DataColumnStorage
custodyInfo *peerdas.CustodyInfo
}
// This defines the interface for interacting with block chain service
type blockchainService interface {
blockchain.BlockReceiver
blockchain.BlobReceiver
blockchain.DataColumnReceiver
blockchain.HeadFetcher
blockchain.FinalizationFetcher
blockchain.ForkFetcher
@@ -166,6 +170,8 @@ type Service struct {
newBlobVerifier verification.NewBlobVerifier
newColumnsVerifier verification.NewDataColumnsVerifier
availableBlocker coverage.AvailableBlocker
reconstructionLock sync.Mutex
reconstructionRandGen *rand.Rand
ctxMap ContextByteVersions
slasherEnabled bool
lcStore *lightClient.Store
@@ -185,6 +191,7 @@ func NewService(ctx context.Context, opts ...Option) *Service {
blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof),
signatureChan: make(chan *signatureVerifier, verifierLimit),
dataColumnLogCh: make(chan dataColumnLogEntry, 1000),
reconstructionRandGen: rand.NewGenerator(),
}
for _, opt := range opts {

View File

@@ -6,12 +6,14 @@ import (
"fmt"
"reflect"
"runtime/debug"
"slices"
"strings"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/altair"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
@@ -188,14 +190,14 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
)
}
// New gossip topic in Fulu
// New gossip topic in Fulu.
if params.BeaconConfig().FuluForkEpoch <= epoch {
s.subscribeWithParameters(
p2p.DataColumnSubnetTopicFormat,
s.validateDataColumn,
func(context.Context, proto.Message) error { return nil },
s.dataColumnSubscriber,
digest,
func(primitives.Slot) []uint64 { return nil },
s.dataColumnSubnetIndices,
func(currentSlot primitives.Slot) []uint64 { return []uint64{} },
)
}
@@ -600,6 +602,19 @@ func (s *Service) enoughPeersAreConnected(subnetTopic string) bool {
return peersWithSubnetCount >= threshold
}
func (s *Service) dataColumnSubnetIndices(_ primitives.Slot) []uint64 {
nodeID := s.cfg.p2p.NodeID()
custodyGroupCount := s.cfg.custodyInfo.CustodyGroupSamplingSize(peerdas.Target)
nodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)
if err != nil {
log.WithError(err).Error("Could not retrieve peer info")
return []uint64{}
}
return sliceFromMap(nodeInfo.DataColumnsSubnets, true /*sorted*/)
}
func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 {
if flags.Get().SubscribeToAllSubnets {
return sliceFromCount(params.BeaconConfig().AttestationSubnetCount)
@@ -727,3 +742,17 @@ func errorIsIgnored(err error) bool {
}
return false
}
// sliceFromMap returns a sorted list of keys from a map.
func sliceFromMap(m map[uint64]bool, sorted ...bool) []uint64 {
result := make([]uint64, 0, len(m))
for k := range m {
result = append(result, k)
}
if len(sorted) > 0 && sorted[0] {
slices.Sort(result)
}
return result
}

View File

@@ -0,0 +1,54 @@
package sync
import (
"context"
"fmt"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed"
opfeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
)
func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) error {
sidecar, ok := msg.(blocks.VerifiedRODataColumn)
if !ok {
return fmt.Errorf("message was not type blocks.VerifiedRODataColumn, type=%T", msg)
}
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
return errors.Wrap(err, "receive data column")
}
slot := sidecar.Slot()
proposerIndex := sidecar.ProposerIndex()
root := sidecar.BlockRoot()
if err := s.reconstructSaveBroadcastDataColumnSidecars(ctx, slot, proposerIndex, root); err != nil {
return errors.Wrap(err, "reconstruct data columns")
}
return nil
}
func (s *Service) receiveDataColumnSidecar(ctx context.Context, sidecar blocks.VerifiedRODataColumn) error {
slot := sidecar.SignedBlockHeader.Header.Slot
proposerIndex := sidecar.SignedBlockHeader.Header.ProposerIndex
columnIndex := sidecar.Index
s.setSeenDataColumnIndex(slot, proposerIndex, columnIndex)
if err := s.cfg.chain.ReceiveDataColumn(sidecar); err != nil {
return errors.Wrap(err, "receive data column")
}
s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{
Type: opfeed.DataColumnSidecarReceived,
Data: &opfeed.DataColumnSidecarReceivedData{
DataColumn: &sidecar,
},
})
return nil
}

View File

@@ -0,0 +1,2 @@
### Added
- PeerDAS: Implement reconstruction.