From b1ac8209b2c7f518726085537c42161027dd5e14 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Wed, 2 Jul 2025 21:02:55 +0200 Subject: [PATCH] PeerDAS: Implement reconstruct. (#15454) * PeerDAS: Implement reconstruct. * Fix Preston's comment. * Fix Preston's comment. --- beacon-chain/core/feed/operation/events.go | 12 +- beacon-chain/node/node.go | 1 + beacon-chain/sync/BUILD.bazel | 6 + beacon-chain/sync/data_columns_reconstruct.go | 208 ++++++++++++++++++ .../sync/data_columns_reconstruct_test.go | 191 ++++++++++++++++ beacon-chain/sync/fork_watcher_test.go | 8 + beacon-chain/sync/metrics.go | 13 ++ beacon-chain/sync/options.go | 9 + beacon-chain/sync/service.go | 25 ++- beacon-chain/sync/subscriber.go | 35 ++- .../sync/subscriber_data_column_sidecar.go | 54 +++++ changelog/manu-peerdas-reconstruction.md | 2 + 12 files changed, 550 insertions(+), 14 deletions(-) create mode 100644 beacon-chain/sync/data_columns_reconstruct.go create mode 100644 beacon-chain/sync/data_columns_reconstruct_test.go create mode 100644 beacon-chain/sync/subscriber_data_column_sidecar.go create mode 100644 changelog/manu-peerdas-reconstruction.md diff --git a/beacon-chain/core/feed/operation/events.go b/beacon-chain/core/feed/operation/events.go index 95507300dc..bc219ba3cf 100644 --- a/beacon-chain/core/feed/operation/events.go +++ b/beacon-chain/core/feed/operation/events.go @@ -38,11 +38,14 @@ const ( // SingleAttReceived is sent after a single attestation object is received from gossip or rpc SingleAttReceived = 9 + // DataColumnSidecarReceived is sent after a data column sidecar is received from gossip or rpc. + DataColumnSidecarReceived = 10 + // BlockGossipReceived is sent after a block has been received from gossip or API that passes validation rules. - BlockGossipReceived = 10 + BlockGossipReceived = 11 // DataColumnReceived is sent after a data column has been seen after gossip validation rules. - DataColumnReceived = 11 + DataColumnReceived = 12 ) // UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events. @@ -94,6 +97,11 @@ type SingleAttReceivedData struct { Attestation ethpb.Att } +// DataColumnSidecarReceivedData is the data sent with DataColumnSidecarReceived events. +type DataColumnSidecarReceivedData struct { + DataColumn *blocks.VerifiedRODataColumn +} + // BlockGossipReceivedData is the data sent with BlockGossipReceived events. type BlockGossipReceivedData struct { // SignedBlock is the block that was received. diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 244cab959a..f0692affdc 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -886,6 +886,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil regularsync.WithDataColumnStorage(b.DataColumnStorage), regularsync.WithVerifierWaiter(b.verifyInitWaiter), regularsync.WithAvailableBlocker(bFillStore), + regularsync.WithCustodyInfo(b.custodyInfo), regularsync.WithSlasherEnabled(b.slasherEnabled), regularsync.WithLightClientStore(b.lcStore), ) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 2d2acc78c4..f08a1bcc75 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "block_batcher.go", "broadcast_bls_changes.go", "context.go", + "data_columns_reconstruct.go", "deadlines.go", "decode_pubsub.go", "doc.go", @@ -40,6 +41,7 @@ go_library( "subscriber_beacon_blocks.go", "subscriber_blob_sidecar.go", "subscriber_bls_to_execution_change.go", + "subscriber_data_column_sidecar.go", "subscriber_handlers.go", "subscriber_light_client.go", "subscriber_sync_committee_message.go", @@ -78,6 +80,7 @@ go_library( "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/light-client:go_default_library", + "//beacon-chain/core/peerdas:go_default_library", "//beacon-chain/core/signing:go_default_library", "//beacon-chain/core/transition:go_default_library", "//beacon-chain/core/transition/interop:go_default_library", @@ -162,6 +165,7 @@ go_test( "block_batcher_test.go", "broadcast_bls_changes_test.go", "context_test.go", + "data_columns_reconstruct_test.go", "decode_pubsub_test.go", "error_test.go", "fork_watcher_test.go", @@ -207,6 +211,7 @@ go_test( deps = [ "//async/abool:go_default_library", "//beacon-chain/blockchain:go_default_library", + "//beacon-chain/blockchain/kzg:go_default_library", "//beacon-chain/blockchain/testing:go_default_library", "//beacon-chain/cache:go_default_library", "//beacon-chain/core/altair:go_default_library", @@ -214,6 +219,7 @@ go_test( "//beacon-chain/core/feed/operation:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/light-client:go_default_library", + "//beacon-chain/core/peerdas:go_default_library", "//beacon-chain/core/signing:go_default_library", "//beacon-chain/core/time:go_default_library", "//beacon-chain/core/transition:go_default_library", diff --git a/beacon-chain/sync/data_columns_reconstruct.go b/beacon-chain/sync/data_columns_reconstruct.go new file mode 100644 index 0000000000..36662ab9f8 --- /dev/null +++ b/beacon-chain/sync/data_columns_reconstruct.go @@ -0,0 +1,208 @@ +package sync + +import ( + "context" + "fmt" + "slices" + "time" + + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" + "github.com/OffchainLabs/prysm/v6/time/slots" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const ( + broadcastMissingDataColumnsTimeIntoSlotMin = 1 * time.Second + broadcastMissingDataColumnsSlack = 2 * time.Second +) + +// reconstructSaveBroadcastDataColumnSidecars reconstructs if possible and +// needed all data column sidecars. Then, it saves into the store missing +// sidecars. After a delay, it broadcasts in the background not seen via gossip +// (but reconstructed) sidecars. +func (s *Service) reconstructSaveBroadcastDataColumnSidecars( + ctx context.Context, + slot primitives.Slot, + proposerIndex primitives.ValidatorIndex, + root [fieldparams.RootLength]byte, +) error { + startTime := time.Now() + + // Get the columns we store. + storedDataColumns := s.cfg.dataColumnStorage.Summary(root) + storedColumnsCount := storedDataColumns.Count() + numberOfColumns := params.BeaconConfig().NumberOfColumns + + // Lock to prevent concurrent reconstructions. + s.reconstructionLock.Lock() + defer s.reconstructionLock.Unlock() + + // If reconstruction is not possible or if all columns are already stored, exit early. + if storedColumnsCount < peerdas.MinimumColumnsCountToReconstruct() || storedColumnsCount == numberOfColumns { + return nil + } + + // Retrieve our local node info. + nodeID := s.cfg.p2p.NodeID() + custodyGroupCount := s.cfg.custodyInfo.ActualGroupCount() + localNodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount) + if err != nil { + return errors.Wrap(err, "peer info") + } + + // Load all the possible data columns sidecars, to minimize reconstruction time. + verifiedSidecars, err := s.cfg.dataColumnStorage.Get(root, nil) + if err != nil { + return errors.Wrap(err, "get data column sidecars") + } + + // Reconstruct all the data column sidecars. + reconstructedSidecars, err := peerdas.ReconstructDataColumnSidecars(verifiedSidecars) + if err != nil { + return errors.Wrap(err, "reconstruct data column sidecars") + } + + // Filter reconstructed sidecars to save. + custodyColumns := localNodeInfo.CustodyColumns + toSaveSidecars := make([]blocks.VerifiedRODataColumn, 0, len(custodyColumns)) + for _, sidecar := range reconstructedSidecars { + if custodyColumns[sidecar.Index] { + toSaveSidecars = append(toSaveSidecars, sidecar) + } + } + + // Save the data columns sidecars in the database. + // Note: We do not call `receiveDataColumn`, because it will ignore + // incoming data columns via gossip while we did not broadcast (yet) the reconstructed data columns. + if err := s.cfg.dataColumnStorage.Save(toSaveSidecars); err != nil { + return errors.Wrap(err, "save data column sidecars") + } + + // Update reconstruction metrics + dataColumnReconstructionHistogram.Observe(float64(time.Since(startTime).Milliseconds())) + dataColumnReconstructionCounter.Add(float64(len(reconstructedSidecars) - len(verifiedSidecars))) + + // Schedule the broadcast. + if err := s.scheduleMissingDataColumnSidecarsBroadcast(ctx, root, proposerIndex, slot); err != nil { + return errors.Wrap(err, "schedule reconstructed data columns broadcast") + } + + log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", root), + "slot": slot, + "fromColumnsCount": storedColumnsCount, + }).Debug("Data columns reconstructed and saved") + + return nil +} + +// scheduleMissingDataColumnSidecarsBroadcast schedules the broadcast of missing +// (aka. not seen via gossip but reconstructed) sidecars. +func (s *Service) scheduleMissingDataColumnSidecarsBroadcast( + ctx context.Context, + root [fieldparams.RootLength]byte, + proposerIndex primitives.ValidatorIndex, + slot primitives.Slot, +) error { + log := log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%x", root), + "slot": slot, + }) + + // Get the time corresponding to the start of the slot. + genesisTime := uint64(s.cfg.chain.GenesisTime().Unix()) + slotStartTime, err := slots.ToTime(genesisTime, slot) + if err != nil { + return errors.Wrap(err, "to time") + } + + // Compute the waiting time. This could be negative. In such a case, broadcast immediately. + randFloat := s.reconstructionRandGen.Float64() + timeIntoSlot := broadcastMissingDataColumnsTimeIntoSlotMin + time.Duration(float64(broadcastMissingDataColumnsSlack)*randFloat) + broadcastTime := slotStartTime.Add(timeIntoSlot) + waitingTime := time.Until(broadcastTime) + time.AfterFunc(waitingTime, func() { + // Return early if the context was canceled during the waiting time. + if err := ctx.Err(); err != nil { + return + } + + if err := s.broadcastMissingDataColumnSidecars(slot, proposerIndex, root, timeIntoSlot); err != nil { + log.WithError(err).Error("Failed to broadcast missing data column sidecars") + } + }) + + return nil +} + +func (s *Service) broadcastMissingDataColumnSidecars( + slot primitives.Slot, + proposerIndex primitives.ValidatorIndex, + root [fieldparams.RootLength]byte, + timeIntoSlot time.Duration, +) error { + // Get the node ID. + nodeID := s.cfg.p2p.NodeID() + + // Get the custody group count. + custodyGroupCount := s.cfg.custodyInfo.ActualGroupCount() + + // Retrieve the local node info. + localNodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount) + if err != nil { + return errors.Wrap(err, "peerdas info") + } + + // Compute the missing data columns (data columns we should custody but we did not received via gossip.) + missingColumns := make([]uint64, 0, len(localNodeInfo.CustodyColumns)) + for column := range localNodeInfo.CustodyColumns { + if !s.hasSeenDataColumnIndex(slot, proposerIndex, column) { + missingColumns = append(missingColumns, column) + } + } + + // Return early if there are no missing data columns. + if len(missingColumns) == 0 { + return nil + } + + // Load from the store the non received but reconstructed data column. + verifiedRODataColumnSidecars, err := s.cfg.dataColumnStorage.Get(root, missingColumns) + if err != nil { + return errors.Wrap(err, "data column storage get") + } + + broadcastedColumns := make([]uint64, 0, len(verifiedRODataColumnSidecars)) + for _, verifiedRODataColumn := range verifiedRODataColumnSidecars { + broadcastedColumns = append(broadcastedColumns, verifiedRODataColumn.Index) + // Compute the subnet for this column. + subnet := peerdas.ComputeSubnetForDataColumnSidecar(verifiedRODataColumn.Index) + + // Broadcast the missing data column. + if err := s.cfg.p2p.BroadcastDataColumn(root, subnet, verifiedRODataColumn.DataColumnSidecar); err != nil { + log.WithError(err).Error("Broadcast data column") + } + + // Now, we can set the data column as seen. + s.setSeenDataColumnIndex(slot, proposerIndex, verifiedRODataColumn.Index) + } + + if logrus.GetLevel() >= logrus.DebugLevel { + // Sort for nice logging. + slices.Sort(broadcastedColumns) + slices.Sort(missingColumns) + + log.WithFields(logrus.Fields{ + "timeIntoSlot": timeIntoSlot, + "missingColumns": missingColumns, + "broadcasted": broadcastedColumns, + }).Debug("Start broadcasting not seen via gossip but reconstructed data columns") + } + + return nil +} diff --git a/beacon-chain/sync/data_columns_reconstruct_test.go b/beacon-chain/sync/data_columns_reconstruct_test.go new file mode 100644 index 0000000000..20e3e0f239 --- /dev/null +++ b/beacon-chain/sync/data_columns_reconstruct_test.go @@ -0,0 +1,191 @@ +package sync + +import ( + "testing" + + "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg" + mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" + "github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem" + p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" + "github.com/OffchainLabs/prysm/v6/testing/require" + "github.com/OffchainLabs/prysm/v6/testing/util" +) + +func TestReconstructDataColumns(t *testing.T) { + const blobCount = 4 + numberOfColumns := params.BeaconConfig().NumberOfColumns + + ctx := t.Context() + + // Start the trusted setup. + err := kzg.Start() + require.NoError(t, err) + + roBlock, _, verifiedRoDataColumns := util.GenerateTestFuluBlockWithSidecars(t, blobCount) + require.Equal(t, numberOfColumns, uint64(len(verifiedRoDataColumns))) + + root, block := roBlock.Root(), roBlock.Block() + slot, proposerIndex := block.Slot(), block.ProposerIndex() + + minimumCount := peerdas.MinimumColumnsCountToReconstruct() + + t.Run("not enough stored sidecars", func(t *testing.T) { + storage := filesystem.NewEphemeralDataColumnStorage(t) + err := storage.Save(verifiedRoDataColumns[:minimumCount-1]) + require.NoError(t, err) + + service := NewService(ctx, WithP2P(p2ptest.NewTestP2P(t)), WithDataColumnStorage(storage)) + err = service.reconstructSaveBroadcastDataColumnSidecars(ctx, slot, proposerIndex, root) + require.NoError(t, err) + }) + + t.Run("all stored sidecars", func(t *testing.T) { + storage := filesystem.NewEphemeralDataColumnStorage(t) + err := storage.Save(verifiedRoDataColumns) + require.NoError(t, err) + + service := NewService(ctx, WithP2P(p2ptest.NewTestP2P(t)), WithDataColumnStorage(storage)) + err = service.reconstructSaveBroadcastDataColumnSidecars(ctx, slot, proposerIndex, root) + require.NoError(t, err) + }) + + t.Run("should reconstruct", func(t *testing.T) { + // Here we setup a cgc of 8, which is not realistic, since there is no + // real reason for a node to both: + // - store enough data column sidecars to enable reconstruction, and + // - custody not enough columns to enable reconstruction. + // However, for the needs of this test, this is perfectly fine. + const cgc = 8 + + storage := filesystem.NewEphemeralDataColumnStorage(t) + minimumCount := peerdas.MinimumColumnsCountToReconstruct() + err := storage.Save(verifiedRoDataColumns[:minimumCount]) + require.NoError(t, err) + + custodyInfo := &peerdas.CustodyInfo{} + custodyInfo.TargetGroupCount.SetValidatorsCustodyRequirement(cgc) + custodyInfo.ToAdvertiseGroupCount.Set(cgc) + + service := NewService( + ctx, + WithP2P(p2ptest.NewTestP2P(t)), + WithDataColumnStorage(storage), + WithCustodyInfo(custodyInfo), + WithChainService(&mockChain.ChainService{}), + ) + + err = service.reconstructSaveBroadcastDataColumnSidecars(ctx, slot, proposerIndex, root) + require.NoError(t, err) + + expected := make(map[uint64]bool, minimumCount+cgc) + for i := range minimumCount { + expected[i] = true + } + + // The node should custody these indices. + for _, i := range [...]uint64{1, 17, 19, 42, 75, 87, 102, 117} { + expected[i] = true + } + + summary := storage.Summary(root) + actual := summary.Stored() + + require.Equal(t, len(expected), len(actual)) + for index := range expected { + require.Equal(t, true, actual[index]) + } + }) +} + +func TestBroadcastMissingDataColumnSidecars(t *testing.T) { + const ( + cgc = 8 + blobCount = 4 + timeIntoSlot = 0 + ) + + numberOfColumns := params.BeaconConfig().NumberOfColumns + ctx := t.Context() + + // Start the trusted setup. + err := kzg.Start() + require.NoError(t, err) + + roBlock, _, verifiedRoDataColumns := util.GenerateTestFuluBlockWithSidecars(t, blobCount) + require.Equal(t, numberOfColumns, uint64(len(verifiedRoDataColumns))) + + root, block := roBlock.Root(), roBlock.Block() + slot, proposerIndex := block.Slot(), block.ProposerIndex() + + t.Run("no missing sidecars", func(t *testing.T) { + custodyInfo := &peerdas.CustodyInfo{} + custodyInfo.TargetGroupCount.SetValidatorsCustodyRequirement(cgc) + custodyInfo.ToAdvertiseGroupCount.Set(cgc) + + service := NewService( + ctx, + WithP2P(p2ptest.NewTestP2P(t)), + WithCustodyInfo(custodyInfo), + ) + + for _, index := range [...]uint64{1, 17, 19, 42, 75, 87, 102, 117} { + key := computeCacheKey(slot, proposerIndex, index) + service.seenDataColumnCache.Add(key, true) + } + + err := service.broadcastMissingDataColumnSidecars(slot, proposerIndex, root, timeIntoSlot) + require.NoError(t, err) + }) + + t.Run("some missing sidecars", func(t *testing.T) { + custodyInfo := &peerdas.CustodyInfo{} + custodyInfo.TargetGroupCount.SetValidatorsCustodyRequirement(cgc) + custodyInfo.ToAdvertiseGroupCount.Set(cgc) + + toSave := make([]blocks.VerifiedRODataColumn, 0, 2) + for _, index := range [...]uint64{42, 87} { + toSave = append(toSave, verifiedRoDataColumns[index]) + } + + p2p := p2ptest.NewTestP2P(t) + storage := filesystem.NewEphemeralDataColumnStorage(t) + err := storage.Save(toSave) + require.NoError(t, err) + + service := NewService( + ctx, + WithP2P(p2p), + WithCustodyInfo(custodyInfo), + WithDataColumnStorage(storage), + ) + + for _, index := range [...]uint64{1, 17, 19, 102, 117} { // 42, 75 and 87 are missing + key := computeCacheKey(slot, proposerIndex, index) + service.seenDataColumnCache.Add(key, true) + } + + for _, index := range [...]uint64{42, 75, 87} { + seen := service.hasSeenDataColumnIndex(slot, proposerIndex, index) + require.Equal(t, false, seen) + } + + require.Equal(t, false, p2p.BroadcastCalled.Load()) + + err = service.broadcastMissingDataColumnSidecars(slot, proposerIndex, root, timeIntoSlot) + require.NoError(t, err) + + seen := service.hasSeenDataColumnIndex(slot, proposerIndex, 75) + require.Equal(t, false, seen) + + for _, index := range [...]uint64{42, 87} { + seen := service.hasSeenDataColumnIndex(slot, proposerIndex, index) + require.Equal(t, true, seen) + } + + require.Equal(t, true, p2p.BroadcastCalled.Load()) + + }) +} diff --git a/beacon-chain/sync/fork_watcher_test.go b/beacon-chain/sync/fork_watcher_test.go index afa28810da..6fd3e69b43 100644 --- a/beacon-chain/sync/fork_watcher_test.go +++ b/beacon-chain/sync/fork_watcher_test.go @@ -7,6 +7,7 @@ import ( "github.com/OffchainLabs/prysm/v6/async/abool" mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p" p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing" "github.com/OffchainLabs/prysm/v6/beacon-chain/startup" @@ -46,6 +47,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) { chain: chainService, clock: startup.NewClock(gt, vr), initialSync: &mockSync.Sync{IsSyncing: false}, + custodyInfo: &peerdas.CustodyInfo{}, }, chainStarted: abool.New(), subHandler: newSubTopicHandler(), @@ -81,6 +83,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) { chain: chainService, clock: startup.NewClock(gt, vr), initialSync: &mockSync.Sync{IsSyncing: false}, + custodyInfo: &peerdas.CustodyInfo{}, }, chainStarted: abool.New(), subHandler: newSubTopicHandler(), @@ -125,6 +128,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) { chain: chainService, clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), initialSync: &mockSync.Sync{IsSyncing: false}, + custodyInfo: &peerdas.CustodyInfo{}, }, chainStarted: abool.New(), subHandler: newSubTopicHandler(), @@ -167,6 +171,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) { chain: chainService, clock: startup.NewClock(gt, vr), initialSync: &mockSync.Sync{IsSyncing: false}, + custodyInfo: &peerdas.CustodyInfo{}, }, chainStarted: abool.New(), subHandler: newSubTopicHandler(), @@ -211,6 +216,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) { chain: chainService, clock: startup.NewClock(gt, vr), initialSync: &mockSync.Sync{IsSyncing: false}, + custodyInfo: &peerdas.CustodyInfo{}, }, chainStarted: abool.New(), subHandler: newSubTopicHandler(), @@ -255,6 +261,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) { chain: chainService, clock: startup.NewClock(gt, vr), initialSync: &mockSync.Sync{IsSyncing: false}, + custodyInfo: &peerdas.CustodyInfo{}, }, chainStarted: abool.New(), subHandler: newSubTopicHandler(), @@ -274,6 +281,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) { } assert.Equal(t, true, rpcMap[p2p.RPCBlobSidecarsByRangeTopicV1+s.cfg.p2p.Encoding().ProtocolSuffix()], "topic doesn't exist") assert.Equal(t, true, rpcMap[p2p.RPCBlobSidecarsByRootTopicV1+s.cfg.p2p.Encoding().ProtocolSuffix()], "topic doesn't exist") + assert.Equal(t, true, rpcMap[p2p.RPCMetaDataTopicV3+s.cfg.p2p.Encoding().ProtocolSuffix()], "topic doesn't exist") }, }, } diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go index bc69659675..186e80d192 100644 --- a/beacon-chain/sync/metrics.go +++ b/beacon-chain/sync/metrics.go @@ -210,6 +210,19 @@ var ( Buckets: []float64{100, 250, 500, 750, 1000, 1500, 2000, 4000, 8000, 12000, 16000}, }, ) + + dataColumnReconstructionCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "beacon_data_availability_reconstructed_columns_total", + Help: "Count the number of reconstructed data columns.", + }) + + dataColumnReconstructionHistogram = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "beacon_data_availability_reconstruction_time_milliseconds", + Help: "Captures the time taken to reconstruct data columns.", + Buckets: []float64{100, 250, 500, 750, 1000, 1500, 2000, 4000, 8000, 12000, 16000}, + }, + ) ) func (s *Service) updateMetrics() { diff --git a/beacon-chain/sync/options.go b/beacon-chain/sync/options.go index ac0fa2e354..01ab90b2af 100644 --- a/beacon-chain/sync/options.go +++ b/beacon-chain/sync/options.go @@ -7,6 +7,7 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation" statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state" lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/beacon-chain/db" "github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem" "github.com/OffchainLabs/prysm/v6/beacon-chain/execution" @@ -198,6 +199,14 @@ func WithAvailableBlocker(avb coverage.AvailableBlocker) Option { } } +// WithCustodyInfo for custody info. +func WithCustodyInfo(custodyInfo *peerdas.CustodyInfo) Option { + return func(s *Service) error { + s.cfg.custodyInfo = custodyInfo + return nil + } +} + // WithSlasherEnabled configures the sync package to support slashing detection. func WithSlasherEnabled(enabled bool) Option { return func(s *Service) error { diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index f6ec74f71b..94b5223c54 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -10,6 +10,8 @@ import ( "time" lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" + "github.com/OffchainLabs/prysm/v6/crypto/rand" lru "github.com/hashicorp/golang-lru" pubsub "github.com/libp2p/go-libp2p-pubsub" libp2pcore "github.com/libp2p/go-libp2p/core" @@ -103,12 +105,14 @@ type config struct { stateNotifier statefeed.Notifier blobStorage *filesystem.BlobStorage dataColumnStorage *filesystem.DataColumnStorage + custodyInfo *peerdas.CustodyInfo } // This defines the interface for interacting with block chain service type blockchainService interface { blockchain.BlockReceiver blockchain.BlobReceiver + blockchain.DataColumnReceiver blockchain.HeadFetcher blockchain.FinalizationFetcher blockchain.ForkFetcher @@ -166,6 +170,8 @@ type Service struct { newBlobVerifier verification.NewBlobVerifier newColumnsVerifier verification.NewDataColumnsVerifier availableBlocker coverage.AvailableBlocker + reconstructionLock sync.Mutex + reconstructionRandGen *rand.Rand ctxMap ContextByteVersions slasherEnabled bool lcStore *lightClient.Store @@ -176,15 +182,16 @@ type Service struct { func NewService(ctx context.Context, opts ...Option) *Service { ctx, cancel := context.WithCancel(ctx) r := &Service{ - ctx: ctx, - cancel: cancel, - chainStarted: abool.New(), - cfg: &config{clock: startup.NewClock(time.Unix(0, 0), [32]byte{})}, - slotToPendingBlocks: gcache.New(pendingBlockExpTime /* exp time */, 0 /* disable janitor */), - seenPendingBlocks: make(map[[32]byte]bool), - blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), - signatureChan: make(chan *signatureVerifier, verifierLimit), - dataColumnLogCh: make(chan dataColumnLogEntry, 1000), + ctx: ctx, + cancel: cancel, + chainStarted: abool.New(), + cfg: &config{clock: startup.NewClock(time.Unix(0, 0), [32]byte{})}, + slotToPendingBlocks: gcache.New(pendingBlockExpTime /* exp time */, 0 /* disable janitor */), + seenPendingBlocks: make(map[[32]byte]bool), + blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), + signatureChan: make(chan *signatureVerifier, verifierLimit), + dataColumnLogCh: make(chan dataColumnLogEntry, 1000), + reconstructionRandGen: rand.NewGenerator(), } for _, opt := range opts { diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 730731eba4..51d59bd837 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -6,12 +6,14 @@ import ( "fmt" "reflect" "runtime/debug" + "slices" "strings" "time" "github.com/OffchainLabs/prysm/v6/beacon-chain/cache" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/altair" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" "github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags" @@ -188,14 +190,14 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { ) } - // New gossip topic in Fulu + // New gossip topic in Fulu. if params.BeaconConfig().FuluForkEpoch <= epoch { s.subscribeWithParameters( p2p.DataColumnSubnetTopicFormat, s.validateDataColumn, - func(context.Context, proto.Message) error { return nil }, + s.dataColumnSubscriber, digest, - func(primitives.Slot) []uint64 { return nil }, + s.dataColumnSubnetIndices, func(currentSlot primitives.Slot) []uint64 { return []uint64{} }, ) } @@ -600,6 +602,19 @@ func (s *Service) enoughPeersAreConnected(subnetTopic string) bool { return peersWithSubnetCount >= threshold } +func (s *Service) dataColumnSubnetIndices(_ primitives.Slot) []uint64 { + nodeID := s.cfg.p2p.NodeID() + custodyGroupCount := s.cfg.custodyInfo.CustodyGroupSamplingSize(peerdas.Target) + + nodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount) + if err != nil { + log.WithError(err).Error("Could not retrieve peer info") + return []uint64{} + } + + return sliceFromMap(nodeInfo.DataColumnsSubnets, true /*sorted*/) +} + func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 { if flags.Get().SubscribeToAllSubnets { return sliceFromCount(params.BeaconConfig().AttestationSubnetCount) @@ -727,3 +742,17 @@ func errorIsIgnored(err error) bool { } return false } + +// sliceFromMap returns a sorted list of keys from a map. +func sliceFromMap(m map[uint64]bool, sorted ...bool) []uint64 { + result := make([]uint64, 0, len(m)) + for k := range m { + result = append(result, k) + } + + if len(sorted) > 0 && sorted[0] { + slices.Sort(result) + } + + return result +} diff --git a/beacon-chain/sync/subscriber_data_column_sidecar.go b/beacon-chain/sync/subscriber_data_column_sidecar.go new file mode 100644 index 0000000000..144da4896a --- /dev/null +++ b/beacon-chain/sync/subscriber_data_column_sidecar.go @@ -0,0 +1,54 @@ +package sync + +import ( + "context" + "fmt" + + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed" + opfeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation" + "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" + "github.com/pkg/errors" + "google.golang.org/protobuf/proto" +) + +func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) error { + sidecar, ok := msg.(blocks.VerifiedRODataColumn) + if !ok { + return fmt.Errorf("message was not type blocks.VerifiedRODataColumn, type=%T", msg) + } + + if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil { + return errors.Wrap(err, "receive data column") + } + + slot := sidecar.Slot() + proposerIndex := sidecar.ProposerIndex() + root := sidecar.BlockRoot() + + if err := s.reconstructSaveBroadcastDataColumnSidecars(ctx, slot, proposerIndex, root); err != nil { + return errors.Wrap(err, "reconstruct data columns") + } + + return nil +} + +func (s *Service) receiveDataColumnSidecar(ctx context.Context, sidecar blocks.VerifiedRODataColumn) error { + slot := sidecar.SignedBlockHeader.Header.Slot + proposerIndex := sidecar.SignedBlockHeader.Header.ProposerIndex + columnIndex := sidecar.Index + + s.setSeenDataColumnIndex(slot, proposerIndex, columnIndex) + + if err := s.cfg.chain.ReceiveDataColumn(sidecar); err != nil { + return errors.Wrap(err, "receive data column") + } + + s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{ + Type: opfeed.DataColumnSidecarReceived, + Data: &opfeed.DataColumnSidecarReceivedData{ + DataColumn: &sidecar, + }, + }) + + return nil +} diff --git a/changelog/manu-peerdas-reconstruction.md b/changelog/manu-peerdas-reconstruction.md new file mode 100644 index 0000000000..d62d9c28e9 --- /dev/null +++ b/changelog/manu-peerdas-reconstruction.md @@ -0,0 +1,2 @@ +### Added +- PeerDAS: Implement reconstruction.