diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index 4dd566c6c5..9ad9352146 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "receive_attestation.go", "receive_blob.go", "receive_block.go", + "receive_data_column.go", "service.go", "setup_forchoice.go", "tracked_proposer.go", @@ -50,6 +51,7 @@ go_library( "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/light-client:go_default_library", + "//beacon-chain/core/peerdas:go_default_library", "//beacon-chain/core/signing:go_default_library", "//beacon-chain/core/time:go_default_library", "//beacon-chain/core/transition:go_default_library", @@ -146,6 +148,7 @@ go_test( "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/light-client:go_default_library", + "//beacon-chain/core/peerdas:go_default_library", "//beacon-chain/core/signing:go_default_library", "//beacon-chain/core/transition:go_default_library", "//beacon-chain/das:go_default_library", diff --git a/beacon-chain/blockchain/error.go b/beacon-chain/blockchain/error.go index f535292c0a..a530f8adba 100644 --- a/beacon-chain/blockchain/error.go +++ b/beacon-chain/blockchain/error.go @@ -40,10 +40,12 @@ var ( errNotGenesisRoot = errors.New("root is not the genesis block root") // errBlacklistedBlock is returned when a block is blacklisted as invalid. errBlacklistedRoot = verification.AsVerificationFailure(errors.New("block root is blacklisted")) + // errMaxBlobsExceeded is returned when the number of blobs in a block exceeds the maximum allowed. + errMaxBlobsExceeded = verification.AsVerificationFailure(errors.New("expected commitments in block exceeds MAX_BLOBS_PER_BLOCK")) + // errMaxDataColumnsExceeded is returned when the number of data columns exceeds the maximum allowed. + errMaxDataColumnsExceeded = verification.AsVerificationFailure(errors.New("expected data columns for node exceeds NUMBER_OF_COLUMNS")) ) -var errMaxBlobsExceeded = verification.AsVerificationFailure(errors.New("Expected commitments in block exceeds MAX_BLOBS_PER_BLOCK")) - // An invalid block is the block that fails state transition based on the core protocol rules. // The beacon node shall not be accepting nor building blocks that branch off from an invalid block. // Some examples of invalid blocks are: diff --git a/beacon-chain/blockchain/execution_engine.go b/beacon-chain/blockchain/execution_engine.go index f78af3ae89..5884d16635 100644 --- a/beacon-chain/blockchain/execution_engine.go +++ b/beacon-chain/blockchain/execution_engine.go @@ -439,6 +439,9 @@ func (s *Service) removeInvalidBlockAndState(ctx context.Context, blkRoots [][32 // Blobs may not exist for some blocks, leading to deletion failures. Log such errors at debug level. log.WithError(err).Debug("Could not remove blob from blob storage") } + if err := s.dataColumnStorage.Remove(root); err != nil { + log.WithError(err).Errorf("Could not remove data columns from data column storage for root %#x", root) + } } return nil } diff --git a/beacon-chain/blockchain/options.go b/beacon-chain/blockchain/options.go index c9aa2db5d0..ea483b338b 100644 --- a/beacon-chain/blockchain/options.go +++ b/beacon-chain/blockchain/options.go @@ -1,10 +1,13 @@ package blockchain import ( + "time" + "github.com/OffchainLabs/prysm/v6/async/event" "github.com/OffchainLabs/prysm/v6/beacon-chain/cache" statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state" lightclient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/beacon-chain/db" "github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem" "github.com/OffchainLabs/prysm/v6/beacon-chain/execution" @@ -127,9 +130,9 @@ func WithBLSToExecPool(p blstoexec.PoolManager) Option { } // WithP2PBroadcaster to broadcast messages after appropriate processing. -func WithP2PBroadcaster(p p2p.Broadcaster) Option { +func WithP2PBroadcaster(p p2p.Accessor) Option { return func(s *Service) error { - s.cfg.P2p = p + s.cfg.P2P = p return nil } } @@ -208,6 +211,15 @@ func WithBlobStorage(b *filesystem.BlobStorage) Option { } } +// WithDataColumnStorage sets the data column storage backend for the blockchain service. +func WithDataColumnStorage(b *filesystem.DataColumnStorage) Option { + return func(s *Service) error { + s.dataColumnStorage = b + return nil + } +} + +// WithSyncChecker sets the sync checker for the blockchain service. func WithSyncChecker(checker Checker) Option { return func(s *Service) error { s.cfg.SyncChecker = checker @@ -215,6 +227,15 @@ func WithSyncChecker(checker Checker) Option { } } +// WithCustodyInfo sets the custody info for the blockchain service. +func WithCustodyInfo(custodyInfo *peerdas.CustodyInfo) Option { + return func(s *Service) error { + s.cfg.CustodyInfo = custodyInfo + return nil + } +} + +// WithSlasherEnabled sets whether the slasher is enabled or not. func WithSlasherEnabled(enabled bool) Option { return func(s *Service) error { s.slasherEnabled = enabled @@ -222,6 +243,15 @@ func WithSlasherEnabled(enabled bool) Option { } } +// WithGenesisTime sets the genesis time for the blockchain service. +func WithGenesisTime(genesisTime time.Time) Option { + return func(s *Service) error { + s.genesisTime = genesisTime + return nil + } +} + +// WithLightClientStore sets the light client store for the blockchain service. func WithLightClientStore(lcs *lightclient.Store) Option { return func(s *Service) error { s.lcStore = lcs diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 27332796ab..e9f1fada2a 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -3,10 +3,12 @@ package blockchain import ( "context" "fmt" + "slices" "time" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/blocks" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" coreTime "github.com/OffchainLabs/prysm/v6/beacon-chain/core/time" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/transition" "github.com/OffchainLabs/prysm/v6/beacon-chain/das" @@ -239,8 +241,9 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo return err } } + if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), b); err != nil { - return errors.Wrapf(err, "could not validate blob data availability at slot %d", b.Block().Slot()) + return errors.Wrapf(err, "could not validate sidecar availability at slot %d", b.Block().Slot()) } args := &forkchoicetypes.BlockAndCheckpoints{Block: b, JustifiedCheckpoint: jCheckpoints[i], @@ -578,12 +581,12 @@ func (s *Service) runLateBlockTasks() { } } -// missingIndices uses the expected commitments from the block to determine +// missingBlobIndices uses the expected commitments from the block to determine // which BlobSidecar indices would need to be in the database for DA success. // It returns a map where each key represents a missing BlobSidecar index. // An empty map means we have all indices; a non-empty map can be used to compare incoming // BlobSidecars against the set of known missing sidecars. -func missingIndices(bs *filesystem.BlobStorage, root [32]byte, expected [][]byte, slot primitives.Slot) (map[uint64]struct{}, error) { +func missingBlobIndices(bs *filesystem.BlobStorage, root [fieldparams.RootLength]byte, expected [][]byte, slot primitives.Slot) (map[uint64]bool, error) { maxBlobsPerBlock := params.BeaconConfig().MaxBlobsPerBlock(slot) if len(expected) == 0 { return nil, nil @@ -592,29 +595,223 @@ func missingIndices(bs *filesystem.BlobStorage, root [32]byte, expected [][]byte return nil, errMaxBlobsExceeded } indices := bs.Summary(root) - missing := make(map[uint64]struct{}, len(expected)) + missing := make(map[uint64]bool, len(expected)) for i := range expected { if len(expected[i]) > 0 && !indices.HasIndex(uint64(i)) { - missing[uint64(i)] = struct{}{} + missing[uint64(i)] = true } } return missing, nil } -// isDataAvailable blocks until all BlobSidecars committed to in the block are available, -// or an error or context cancellation occurs. A nil result means that the data availability check is successful. -// The function will first check the database to see if all sidecars have been persisted. If any -// sidecars are missing, it will then read from the blobNotifier channel for the given root until the channel is -// closed, the context hits cancellation/timeout, or notifications have been received for all the missing sidecars. -func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error { - if signed.Version() < version.Deneb { - return nil +// missingDataColumnIndices uses the expected data columns from the block to determine +// which DataColumnSidecar indices would need to be in the database for DA success. +// It returns a map where each key represents a missing DataColumnSidecar index. +// An empty map means we have all indices; a non-empty map can be used to compare incoming +// DataColumns against the set of known missing sidecars. +func missingDataColumnIndices(bs *filesystem.DataColumnStorage, root [fieldparams.RootLength]byte, expected map[uint64]bool) (map[uint64]bool, error) { + if len(expected) == 0 { + return nil, nil } - block := signed.Block() + numberOfColumns := params.BeaconConfig().NumberOfColumns + + if uint64(len(expected)) > numberOfColumns { + return nil, errMaxDataColumnsExceeded + } + + // Get a summary of the data columns stored in the database. + summary := bs.Summary(root) + + // Check all expected data columns against the summary. + missing := make(map[uint64]bool) + for column := range expected { + if !summary.HasIndex(column) { + missing[column] = true + } + } + + return missing, nil +} + +// isDataAvailable blocks until all sidecars committed to in the block are available, +// or an error or context cancellation occurs. A nil result means that the data availability check is successful. +// The function will first check the database to see if all sidecars have been persisted. If any +// sidecars are missing, it will then read from the sidecar notifier channel for the given root until the channel is +// closed, the context hits cancellation/timeout, or notifications have been received for all the missing sidecars. +func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signedBlock interfaces.ReadOnlySignedBeaconBlock) error { + block := signedBlock.Block() if block == nil { return errors.New("invalid nil beacon block") } + + blockVersion := block.Version() + if blockVersion >= version.Fulu { + return s.areDataColumnsAvailable(ctx, root, block) + } + + if blockVersion >= version.Deneb { + return s.areBlobsAvailable(ctx, root, block) + } + + return nil +} + +// areDataColumnsAvailable blocks until all data columns committed to in the block are available, +// or an error or context cancellation occurs. A nil result means that the data availability check is successful. +func (s *Service) areDataColumnsAvailable(ctx context.Context, root [fieldparams.RootLength]byte, block interfaces.ReadOnlyBeaconBlock) error { + // We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS + blockSlot, currentSlot := block.Slot(), s.CurrentSlot() + blockEpoch, currentEpoch := slots.ToEpoch(blockSlot), slots.ToEpoch(currentSlot) + + if !params.WithinDAPeriod(blockEpoch, currentEpoch) { + return nil + } + + body := block.Body() + if body == nil { + return errors.New("invalid nil beacon block body") + } + + kzgCommitments, err := body.BlobKzgCommitments() + if err != nil { + return errors.Wrap(err, "blob KZG commitments") + } + + // If block has not commitments there is nothing to wait for. + if len(kzgCommitments) == 0 { + return nil + } + + // All columns to sample need to be available for the block to be considered available. + // https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/das-core.md#custody-sampling + nodeID := s.cfg.P2P.NodeID() + + // Prevent custody group count to change during the rest of the function. + s.cfg.CustodyInfo.Mut.RLock() + defer s.cfg.CustodyInfo.Mut.RUnlock() + + // Get the custody group sampling size for the node. + custodyGroupSamplingSize := s.cfg.CustodyInfo.CustodyGroupSamplingSize(peerdas.Actual) + peerInfo, _, err := peerdas.Info(nodeID, custodyGroupSamplingSize) + if err != nil { + return errors.Wrap(err, "peer info") + } + + // Subscribe to newly data columns stored in the database. + subscription, identsChan := s.dataColumnStorage.Subscribe() + defer subscription.Unsubscribe() + + // Get the count of data columns we already have in the store. + summary := s.dataColumnStorage.Summary(root) + storedDataColumnsCount := summary.Count() + + minimumColumnCountToReconstruct := peerdas.MinimumColumnsCountToReconstruct() + + // As soon as we have enough data column sidecars, we can reconstruct the missing ones. + // We don't need to wait for the rest of the data columns to declare the block as available. + if storedDataColumnsCount >= minimumColumnCountToReconstruct { + return nil + } + + // Get a map of data column indices that are not currently available. + missingMap, err := missingDataColumnIndices(s.dataColumnStorage, root, peerInfo.CustodyColumns) + if err != nil { + return errors.Wrap(err, "missing data columns") + } + + // If there are no missing indices, all data column sidecars are available. + // This is the happy path. + if len(missingMap) == 0 { + return nil + } + + // Log for DA checks that cross over into the next slot; helpful for debugging. + nextSlot := slots.BeginsAt(block.Slot()+1, s.genesisTime) + + // Avoid logging if DA check is called after next slot start. + if nextSlot.After(time.Now()) { + timer := time.AfterFunc(time.Until(nextSlot), func() { + missingMapCount := uint64(len(missingMap)) + + if missingMapCount == 0 { + return + } + + var ( + expected interface{} = "all" + missing interface{} = "all" + ) + + numberOfColumns := params.BeaconConfig().NumberOfColumns + colMapCount := uint64(len(peerInfo.CustodyColumns)) + + if colMapCount < numberOfColumns { + expected = uint64MapToSortedSlice(peerInfo.CustodyColumns) + } + + if missingMapCount < numberOfColumns { + missing = uint64MapToSortedSlice(missingMap) + } + + log.WithFields(logrus.Fields{ + "slot": block.Slot(), + "root": fmt.Sprintf("%#x", root), + "columnsExpected": expected, + "columnsWaiting": missing, + }).Warning("Data columns still missing at slot end") + }) + defer timer.Stop() + } + + for { + select { + case idents := <-identsChan: + if idents.Root != root { + // This is not the root we are looking for. + continue + } + + for _, index := range idents.Indices { + // This is a data column we are expecting. + if _, ok := missingMap[index]; ok { + storedDataColumnsCount++ + } + + // As soon as we have more than half of the data columns, we can reconstruct the missing ones. + // We don't need to wait for the rest of the data columns to declare the block as available. + if storedDataColumnsCount >= minimumColumnCountToReconstruct { + return nil + } + + // Remove the index from the missing map. + delete(missingMap, index) + + // Return if there is no more missing data columns. + if len(missingMap) == 0 { + return nil + } + } + + case <-ctx.Done(): + var missingIndices interface{} = "all" + numberOfColumns := params.BeaconConfig().NumberOfColumns + missingIndicesCount := uint64(len(missingMap)) + + if missingIndicesCount < numberOfColumns { + missingIndices = uint64MapToSortedSlice(missingMap) + } + + return errors.Wrapf(ctx.Err(), "data column sidecars slot: %d, BlockRoot: %#x, missing %v", block.Slot(), root, missingIndices) + } + } +} + +// areBlobsAvailable blocks until all BlobSidecars committed to in the block are available, +// or an error or context cancellation occurs. A nil result means that the data availability check is successful. +func (s *Service) areBlobsAvailable(ctx context.Context, root [fieldparams.RootLength]byte, block interfaces.ReadOnlyBeaconBlock) error { + blockSlot := block.Slot() + // We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS if !params.WithinDAPeriod(slots.ToEpoch(block.Slot()), slots.ToEpoch(s.CurrentSlot())) { return nil @@ -634,9 +831,9 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int return nil } // get a map of BlobSidecar indices that are not currently available. - missing, err := missingIndices(s.blobStorage, root, kzgCommitments, block.Slot()) + missing, err := missingBlobIndices(s.blobStorage, root, kzgCommitments, block.Slot()) if err != nil { - return err + return errors.Wrap(err, "missing indices") } // If there are no missing indices, all BlobSidecars are available. if len(missing) == 0 { @@ -648,15 +845,20 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int nc := s.blobNotifiers.forRoot(root, block.Slot()) // Log for DA checks that cross over into the next slot; helpful for debugging. - nextSlot := slots.BeginsAt(signed.Block().Slot()+1, s.genesisTime) + nextSlot := slots.BeginsAt(block.Slot()+1, s.genesisTime) // Avoid logging if DA check is called after next slot start. if nextSlot.After(time.Now()) { nst := time.AfterFunc(time.Until(nextSlot), func() { if len(missing) == 0 { return } - log.WithFields(daCheckLogFields(root, signed.Block().Slot(), expected, len(missing))). - Error("Still waiting for DA check at slot end.") + + log.WithFields(logrus.Fields{ + "slot": blockSlot, + "root": fmt.Sprintf("%#x", root), + "blobsExpected": expected, + "blobsWaiting": len(missing), + }).Error("Still waiting for blobs DA check at slot end.") }) defer nst.Stop() } @@ -678,13 +880,14 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int } } -func daCheckLogFields(root [32]byte, slot primitives.Slot, expected, missing int) logrus.Fields { - return logrus.Fields{ - "slot": slot, - "root": fmt.Sprintf("%#x", root), - "blobsExpected": expected, - "blobsWaiting": missing, +// uint64MapToSortedSlice produces a sorted uint64 slice from a map. +func uint64MapToSortedSlice(input map[uint64]bool) []uint64 { + output := make([]uint64, 0, len(input)) + for idx := range input { + output = append(output, idx) } + slices.Sort[[]uint64](output) + return output } // lateBlockTasks is called 4 seconds into the slot and performs tasks @@ -770,7 +973,7 @@ func (s *Service) waitForSync() error { } } -func (s *Service) handleInvalidExecutionError(ctx context.Context, err error, blockRoot [32]byte, parentRoot [32]byte) error { +func (s *Service) handleInvalidExecutionError(ctx context.Context, err error, blockRoot, parentRoot [fieldparams.RootLength]byte) error { if IsInvalidBlock(err) && InvalidBlockLVH(err) != [32]byte{} { return s.pruneInvalidBlock(ctx, blockRoot, parentRoot, InvalidBlockLVH(err)) } diff --git a/beacon-chain/blockchain/process_block_helpers.go b/beacon-chain/blockchain/process_block_helpers.go index b603e4c973..342091ca7c 100644 --- a/beacon-chain/blockchain/process_block_helpers.go +++ b/beacon-chain/blockchain/process_block_helpers.go @@ -310,7 +310,7 @@ func (s *Service) processLightClientFinalityUpdate( Data: newUpdate, }) - if err = s.cfg.P2p.BroadcastLightClientFinalityUpdate(ctx, newUpdate); err != nil { + if err = s.cfg.P2P.BroadcastLightClientFinalityUpdate(ctx, newUpdate); err != nil { return errors.Wrap(err, "could not broadcast light client finality update") } @@ -363,7 +363,7 @@ func (s *Service) processLightClientOptimisticUpdate(ctx context.Context, signed Data: newUpdate, }) - if err = s.cfg.P2p.BroadcastLightClientOptimisticUpdate(ctx, newUpdate); err != nil { + if err = s.cfg.P2P.BroadcastLightClientOptimisticUpdate(ctx, newUpdate); err != nil { return errors.Wrap(err, "could not broadcast light client optimistic update") } diff --git a/beacon-chain/blockchain/process_block_test.go b/beacon-chain/blockchain/process_block_test.go index 0894a36e29..9824a0c4b9 100644 --- a/beacon-chain/blockchain/process_block_test.go +++ b/beacon-chain/blockchain/process_block_test.go @@ -13,6 +13,7 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/core/blocks" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers" lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/signing" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/transition" "github.com/OffchainLabs/prysm/v6/beacon-chain/das" @@ -2331,13 +2332,13 @@ func driftGenesisTime(s *Service, slot, delay int64) { s.cfg.ForkChoiceStore.SetGenesisTime(uint64(newTime.Unix())) } -func TestMissingIndices(t *testing.T) { +func TestMissingBlobIndices(t *testing.T) { cases := []struct { name string expected [][]byte present []uint64 result map[uint64]struct{} - root [32]byte + root [fieldparams.RootLength]byte err error }{ { @@ -2395,7 +2396,7 @@ func TestMissingIndices(t *testing.T) { bm, bs := filesystem.NewEphemeralBlobStorageWithMocker(t) t.Run(c.name, func(t *testing.T) { require.NoError(t, bm.CreateFakeIndices(c.root, 0, c.present...)) - missing, err := missingIndices(bs, c.root, c.expected, 0) + missing, err := missingBlobIndices(bs, c.root, c.expected, 0) if c.err != nil { require.ErrorIs(t, err, c.err) return @@ -2403,9 +2404,70 @@ func TestMissingIndices(t *testing.T) { require.NoError(t, err) require.Equal(t, len(c.result), len(missing)) for key := range c.result { - m, ok := missing[key] - require.Equal(t, true, ok) - require.Equal(t, c.result[key], m) + require.Equal(t, true, missing[key]) + } + }) + } +} + +func TestMissingDataColumnIndices(t *testing.T) { + countPlusOne := params.BeaconConfig().NumberOfColumns + 1 + tooManyColumns := make(map[uint64]bool, countPlusOne) + for i := range countPlusOne { + tooManyColumns[uint64(i)] = true + } + + testCases := []struct { + name string + storedIndices []uint64 + input map[uint64]bool + expected map[uint64]bool + err error + }{ + { + name: "zero len expected", + input: map[uint64]bool{}, + }, + { + name: "expected exceeds max", + input: tooManyColumns, + err: errMaxDataColumnsExceeded, + }, + { + name: "all missing", + storedIndices: []uint64{}, + input: map[uint64]bool{0: true, 1: true, 2: true}, + expected: map[uint64]bool{0: true, 1: true, 2: true}, + }, + { + name: "none missing", + input: map[uint64]bool{0: true, 1: true, 2: true}, + expected: map[uint64]bool{}, + storedIndices: []uint64{0, 1, 2, 3, 4}, // Extra columns stored but not expected + }, + { + name: "some missing", + storedIndices: []uint64{0, 20}, + input: map[uint64]bool{0: true, 10: true, 20: true, 30: true}, + expected: map[uint64]bool{10: true, 30: true}, + }, + } + + var emptyRoot [fieldparams.RootLength]byte + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dcm, dcs := filesystem.NewEphemeralDataColumnStorageWithMocker(t) + err := dcm.CreateFakeIndices(emptyRoot, 0, tc.storedIndices...) + require.NoError(t, err) + + // Test the function + actual, err := missingDataColumnIndices(dcs, emptyRoot, tc.input) + require.ErrorIs(t, err, tc.err) + + require.Equal(t, len(tc.expected), len(actual)) + for key := range tc.expected { + require.Equal(t, true, actual[key]) } }) } @@ -3246,6 +3308,193 @@ func TestSaveLightClientBootstrap(t *testing.T) { reset() } +type testIsAvailableParams struct { + options []Option + blobKzgCommitmentsCount uint64 + columnsToSave []uint64 +} + +func testIsAvailableSetup(t *testing.T, params testIsAvailableParams) (context.Context, context.CancelFunc, *Service, [fieldparams.RootLength]byte, interfaces.SignedBeaconBlock) { + ctx, cancel := context.WithCancel(context.Background()) + dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t) + + options := append(params.options, WithDataColumnStorage(dataColumnStorage)) + service, _ := minimalTestService(t, options...) + + genesisState, secretKeys := util.DeterministicGenesisStateElectra(t, 32 /*validator count*/) + + err := service.saveGenesisData(ctx, genesisState) + require.NoError(t, err) + + conf := util.DefaultBlockGenConfig() + conf.NumBlobKzgCommitments = params.blobKzgCommitmentsCount + + signedBeaconBlock, err := util.GenerateFullBlockFulu(genesisState, secretKeys, conf, 10 /*block slot*/) + require.NoError(t, err) + + root, err := signedBeaconBlock.Block.HashTreeRoot() + require.NoError(t, err) + + dataColumnsParams := make([]util.DataColumnParams, 0, len(params.columnsToSave)) + for _, i := range params.columnsToSave { + dataColumnParam := util.DataColumnParams{ColumnIndex: i} + dataColumnsParams = append(dataColumnsParams, dataColumnParam) + } + + dataColumnParamsByBlockRoot := util.DataColumnsParamsByRoot{root: dataColumnsParams} + _, verifiedRODataColumns := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot) + + err = dataColumnStorage.Save(verifiedRODataColumns) + require.NoError(t, err) + + signed, err := consensusblocks.NewSignedBeaconBlock(signedBeaconBlock) + require.NoError(t, err) + + return ctx, cancel, service, root, signed +} + +func TestIsDataAvailable(t *testing.T) { + t.Run("Fulu - out of retention window", func(t *testing.T) { + params := testIsAvailableParams{options: []Option{WithGenesisTime(time.Unix(0, 0))}} + ctx, _, service, root, signed := testIsAvailableSetup(t, params) + + err := service.isDataAvailable(ctx, root, signed) + require.NoError(t, err) + }) + + t.Run("Fulu - no commitment in blocks", func(t *testing.T) { + ctx, _, service, root, signed := testIsAvailableSetup(t, testIsAvailableParams{}) + + err := service.isDataAvailable(ctx, root, signed) + require.NoError(t, err) + }) + + t.Run("Fulu - more than half of the columns in custody", func(t *testing.T) { + minimumColumnsCountToReconstruct := peerdas.MinimumColumnsCountToReconstruct() + indices := make([]uint64, 0, minimumColumnsCountToReconstruct) + for i := range minimumColumnsCountToReconstruct { + indices = append(indices, i) + } + + params := testIsAvailableParams{ + options: []Option{WithCustodyInfo(&peerdas.CustodyInfo{})}, + columnsToSave: indices, + blobKzgCommitmentsCount: 3, + } + + ctx, _, service, root, signed := testIsAvailableSetup(t, params) + + err := service.isDataAvailable(ctx, root, signed) + require.NoError(t, err) + }) + + t.Run("Fulu - no missing data columns", func(t *testing.T) { + params := testIsAvailableParams{ + options: []Option{WithCustodyInfo(&peerdas.CustodyInfo{})}, + columnsToSave: []uint64{1, 17, 19, 42, 75, 87, 102, 117, 119}, // 119 is not needed + blobKzgCommitmentsCount: 3, + } + + ctx, _, service, root, signed := testIsAvailableSetup(t, params) + + err := service.isDataAvailable(ctx, root, signed) + require.NoError(t, err) + }) + + t.Run("Fulu - some initially missing data columns (no reconstruction)", func(t *testing.T) { + testParams := testIsAvailableParams{ + options: []Option{WithCustodyInfo(&peerdas.CustodyInfo{})}, + columnsToSave: []uint64{1, 17, 19, 75, 102, 117, 119}, // 119 is not needed, 42 and 87 are missing + + blobKzgCommitmentsCount: 3, + } + + ctx, _, service, root, signed := testIsAvailableSetup(t, testParams) + + var wrongRoot [fieldparams.RootLength]byte + copy(wrongRoot[:], root[:]) + wrongRoot[0]++ // change the root to simulate a wrong root + + _, verifiedSidecarsWrongRoot := util.CreateTestVerifiedRoDataColumnSidecars(t, util.DataColumnsParamsByRoot{wrongRoot: { + {ColumnIndex: 42}, // needed + }}) + + _, verifiedSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, util.DataColumnsParamsByRoot{root: { + {ColumnIndex: 87}, // needed + {ColumnIndex: 1}, // not needed + {ColumnIndex: 42}, // needed + }}) + + time.AfterFunc(10*time.Millisecond, func() { + err := service.dataColumnStorage.Save(verifiedSidecarsWrongRoot) + require.NoError(t, err) + + err = service.dataColumnStorage.Save(verifiedSidecars) + require.NoError(t, err) + }) + + err := service.isDataAvailable(ctx, root, signed) + require.NoError(t, err) + }) + + t.Run("Fulu - some initially missing data columns (reconstruction)", func(t *testing.T) { + const ( + missingColumns = uint64(2) + cgc = 128 + ) + var custodyInfo peerdas.CustodyInfo + custodyInfo.TargetGroupCount.SetValidatorsCustodyRequirement(cgc) + custodyInfo.ToAdvertiseGroupCount.Set(cgc) + + minimumColumnsCountToReconstruct := peerdas.MinimumColumnsCountToReconstruct() + indices := make([]uint64, 0, minimumColumnsCountToReconstruct-missingColumns) + + for i := range minimumColumnsCountToReconstruct - missingColumns { + indices = append(indices, i) + } + + testParams := testIsAvailableParams{ + options: []Option{WithCustodyInfo(&custodyInfo)}, + columnsToSave: indices, + blobKzgCommitmentsCount: 3, + } + + ctx, _, service, root, signed := testIsAvailableSetup(t, testParams) + + dataColumnParams := make([]util.DataColumnParams, 0, missingColumns) + for i := minimumColumnsCountToReconstruct - missingColumns; i < minimumColumnsCountToReconstruct; i++ { + dataColumnParam := util.DataColumnParams{ColumnIndex: i} + dataColumnParams = append(dataColumnParams, dataColumnParam) + } + + _, verifiedSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, util.DataColumnsParamsByRoot{root: dataColumnParams}) + + time.AfterFunc(10*time.Millisecond, func() { + err := service.dataColumnStorage.Save(verifiedSidecars) + require.NoError(t, err) + }) + + err := service.isDataAvailable(ctx, root, signed) + require.NoError(t, err) + }) + + t.Run("Fulu - some columns are definitively missing", func(t *testing.T) { + params := testIsAvailableParams{ + options: []Option{WithCustodyInfo(&peerdas.CustodyInfo{})}, + blobKzgCommitmentsCount: 3, + } + + ctx, cancel, service, root, signed := testIsAvailableSetup(t, params) + + time.AfterFunc(10*time.Millisecond, func() { + cancel() + }) + + err := service.isDataAvailable(ctx, root, signed) + require.NotNil(t, err) + }) +} + func setupLightClientTestRequirements(ctx context.Context, t *testing.T, s *Service, v int, options ...util.LightClientOption) (*util.TestLightClient, *postBlockProcessConfig) { var l *util.TestLightClient switch v { @@ -3310,7 +3559,7 @@ func TestProcessLightClientOptimisticUpdate(t *testing.T) { params.OverrideBeaconConfig(beaconCfg) s, tr := minimalTestService(t) - s.cfg.P2p = &mockp2p.FakeP2P{} + s.cfg.P2P = &mockp2p.FakeP2P{} ctx := tr.ctx testCases := []struct { @@ -3446,7 +3695,7 @@ func TestProcessLightClientFinalityUpdate(t *testing.T) { params.OverrideBeaconConfig(beaconCfg) s, tr := minimalTestService(t) - s.cfg.P2p = &mockp2p.FakeP2P{} + s.cfg.P2P = &mockp2p.FakeP2P{} ctx := tr.ctx testCases := []struct { diff --git a/beacon-chain/blockchain/receive_block.go b/beacon-chain/blockchain/receive_block.go index 8d21dfa953..10a9328aec 100644 --- a/beacon-chain/blockchain/receive_block.go +++ b/beacon-chain/blockchain/receive_block.go @@ -16,6 +16,7 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/slasher/types" "github.com/OffchainLabs/prysm/v6/beacon-chain/state" "github.com/OffchainLabs/prysm/v6/config/features" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" @@ -52,6 +53,13 @@ type BlobReceiver interface { ReceiveBlob(context.Context, blocks.VerifiedROBlob) error } +// DataColumnReceiver interface defines the methods of chain service for receiving new +// data columns +type DataColumnReceiver interface { + ReceiveDataColumn(blocks.VerifiedRODataColumn) error + ReceiveDataColumns([]blocks.VerifiedRODataColumn) error +} + // SlashingReceiver interface defines the methods of chain service for receiving validated slashing over the wire. type SlashingReceiver interface { ReceiveAttesterSlashing(ctx context.Context, slashing ethpb.AttSlashing) @@ -74,6 +82,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Debug("Ignoring already synced block") return nil } + receivedTime := time.Now() s.blockBeingSynced.set(blockRoot) defer s.blockBeingSynced.unset(blockRoot) @@ -82,6 +91,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig if err != nil { return err } + preState, err := s.getBlockPreState(ctx, blockCopy.Block()) if err != nil { return errors.Wrap(err, "could not get block's prestate") @@ -97,10 +107,12 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig if err != nil { return err } + daWaitedTime, err := s.handleDA(ctx, blockCopy, blockRoot, avs) if err != nil { return err } + // Defragment the state before continuing block processing. s.defragmentState(postState) @@ -227,26 +239,34 @@ func (s *Service) validateExecutionAndConsensus( func (s *Service) handleDA( ctx context.Context, block interfaces.SignedBeaconBlock, - blockRoot [32]byte, + blockRoot [fieldparams.RootLength]byte, avs das.AvailabilityStore, -) (time.Duration, error) { - daStartTime := time.Now() - if avs != nil { - rob, err := blocks.NewROBlockWithRoot(block, blockRoot) - if err != nil { - return 0, err +) (elapsed time.Duration, err error) { + defer func(start time.Time) { + elapsed = time.Since(start) + + if err == nil { + dataAvailWaitedTime.Observe(float64(elapsed.Milliseconds())) } - if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), rob); err != nil { - return 0, errors.Wrap(err, "could not validate blob data availability (AvailabilityStore.IsDataAvailable)") - } - } else { - if err := s.isDataAvailable(ctx, blockRoot, block); err != nil { - return 0, errors.Wrap(err, "could not validate blob data availability") + }(time.Now()) + + if avs == nil { + if err = s.isDataAvailable(ctx, blockRoot, block); err != nil { + return } + + return } - daWaitedTime := time.Since(daStartTime) - dataAvailWaitedTime.Observe(float64(daWaitedTime.Milliseconds())) - return daWaitedTime, nil + + var rob blocks.ROBlock + rob, err = blocks.NewROBlockWithRoot(block, blockRoot) + if err != nil { + return + } + + err = avs.IsDataAvailable(ctx, s.CurrentSlot(), rob) + + return } func (s *Service) reportPostBlockProcessing( diff --git a/beacon-chain/blockchain/receive_block_test.go b/beacon-chain/blockchain/receive_block_test.go index 2e06d40a6e..f40469f656 100644 --- a/beacon-chain/blockchain/receive_block_test.go +++ b/beacon-chain/blockchain/receive_block_test.go @@ -180,6 +180,19 @@ func TestService_ReceiveBlock(t *testing.T) { } wg.Wait() } +func TestHandleDA(t *testing.T) { + signedBeaconBlock, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlock{ + Block: ðpb.BeaconBlock{ + Body: ðpb.BeaconBlockBody{}, + }, + }) + require.NoError(t, err) + + s, _ := minimalTestService(t) + elapsed, err := s.handleDA(context.Background(), signedBeaconBlock, [fieldparams.RootLength]byte{}, nil) + require.NoError(t, err) + require.Equal(t, true, elapsed > 0, "Elapsed time should be greater than 0") +} func TestService_ReceiveBlockUpdateHead(t *testing.T) { s, tr := minimalTestService(t, diff --git a/beacon-chain/blockchain/receive_data_column.go b/beacon-chain/blockchain/receive_data_column.go new file mode 100644 index 0000000000..f58e5dee12 --- /dev/null +++ b/beacon-chain/blockchain/receive_data_column.go @@ -0,0 +1,25 @@ +package blockchain + +import ( + "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" + "github.com/pkg/errors" +) + +// ReceiveDataColumns receives a batch of data columns. +func (s *Service) ReceiveDataColumns(dataColumnSidecars []blocks.VerifiedRODataColumn) error { + if err := s.dataColumnStorage.Save(dataColumnSidecars); err != nil { + return errors.Wrap(err, "save data column sidecars") + } + + return nil +} + +// ReceiveDataColumn receives a single data column. +// (It is only a wrapper around ReceiveDataColumns.) +func (s *Service) ReceiveDataColumn(dataColumnSidecar blocks.VerifiedRODataColumn) error { + if err := s.dataColumnStorage.Save([]blocks.VerifiedRODataColumn{dataColumnSidecar}); err != nil { + return errors.Wrap(err, "save data column sidecars") + } + + return nil +} diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 843f97d669..b370f8f474 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -16,6 +16,7 @@ import ( statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers" lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" coreTime "github.com/OffchainLabs/prysm/v6/beacon-chain/core/time" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/transition" "github.com/OffchainLabs/prysm/v6/beacon-chain/db" @@ -64,6 +65,7 @@ type Service struct { blobNotifiers *blobNotifierMap blockBeingSynced *currentlySyncingBlock blobStorage *filesystem.BlobStorage + dataColumnStorage *filesystem.DataColumnStorage slasherEnabled bool lcStore *lightClient.Store } @@ -81,7 +83,7 @@ type config struct { ExitPool voluntaryexits.PoolManager SlashingPool slashings.PoolManager BLSToExecPool blstoexec.PoolManager - P2p p2p.Broadcaster + P2P p2p.Accessor MaxRoutines int StateNotifier statefeed.Notifier ForkChoiceStore f.ForkChoicer @@ -93,6 +95,7 @@ type config struct { FinalizedStateAtStartUp state.BeaconState ExecutionEngineCaller execution.EngineCaller SyncChecker Checker + CustodyInfo *peerdas.CustodyInfo } // Checker is an interface used to determine if a node is in initial sync diff --git a/beacon-chain/blockchain/service_test.go b/beacon-chain/blockchain/service_test.go index d68835bf8a..523452c067 100644 --- a/beacon-chain/blockchain/service_test.go +++ b/beacon-chain/blockchain/service_test.go @@ -97,7 +97,7 @@ func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service { WithAttestationPool(attestations.NewPool()), WithSlashingPool(slashings.NewPool()), WithExitPool(voluntaryexits.NewPool()), - WithP2PBroadcaster(&mockBroadcaster{}), + WithP2PBroadcaster(&mockAccessor{}), WithStateNotifier(&mockBeaconNode{}), WithForkChoiceStore(fc), WithAttestationService(attService), diff --git a/beacon-chain/blockchain/setup_test.go b/beacon-chain/blockchain/setup_test.go index f9f7c01580..d0041ecb80 100644 --- a/beacon-chain/blockchain/setup_test.go +++ b/beacon-chain/blockchain/setup_test.go @@ -49,7 +49,7 @@ type mockBroadcaster struct { broadcastCalled bool } -type mockAccesser struct { +type mockAccessor struct { mockBroadcaster p2pTesting.MockPeerManager } @@ -144,9 +144,10 @@ func minimalTestService(t *testing.T, opts ...Option) (*Service, *testServiceReq WithDepositCache(dc), WithTrackedValidatorsCache(cache.NewTrackedValidatorsCache()), WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)), + WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)), WithSyncChecker(mock.MockChecker{}), WithExecutionEngineCaller(&mockExecution.EngineClient{}), - WithP2PBroadcaster(&mockAccesser{}), + WithP2PBroadcaster(&mockAccessor{}), WithLightClientStore(&lightclient.Store{}), } // append the variadic opts so they override the defaults by being processed afterwards diff --git a/beacon-chain/blockchain/testing/mock.go b/beacon-chain/blockchain/testing/mock.go index 40b25904c7..292e6364d1 100644 --- a/beacon-chain/blockchain/testing/mock.go +++ b/beacon-chain/blockchain/testing/mock.go @@ -75,6 +75,7 @@ type ChainService struct { BlockSlot primitives.Slot SyncingRoot [32]byte Blobs []blocks.VerifiedROBlob + DataColumns []blocks.VerifiedRODataColumn TargetRoot [32]byte MockHeadSlot *primitives.Slot } @@ -715,6 +716,17 @@ func (c *ChainService) ReceiveBlob(_ context.Context, b blocks.VerifiedROBlob) e return nil } +// ReceiveDataColumn implements the same method in chain service +func (c *ChainService) ReceiveDataColumn(dc blocks.VerifiedRODataColumn) error { + c.DataColumns = append(c.DataColumns, dc) + return nil +} + +// ReceiveDataColumns implements the same method in chain service +func (*ChainService) ReceiveDataColumns(_ []blocks.VerifiedRODataColumn) error { + return nil +} + // TargetRootForEpoch mocks the same method in the chain service func (c *ChainService) TargetRootForEpoch(_ [32]byte, _ primitives.Epoch) ([32]byte, error) { return c.TargetRoot, nil diff --git a/beacon-chain/db/filesystem/mock.go b/beacon-chain/db/filesystem/mock.go index f1fe0a3fc2..b96e5ea6d0 100644 --- a/beacon-chain/db/filesystem/mock.go +++ b/beacon-chain/db/filesystem/mock.go @@ -8,6 +8,7 @@ import ( "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" "github.com/OffchainLabs/prysm/v6/time/slots" + "github.com/pkg/errors" "github.com/spf13/afero" ) @@ -46,14 +47,21 @@ func NewWarmedEphemeralBlobStorageUsingFs(t testing.TB, fs afero.Fs, opts ...Blo return bs } -type BlobMocker struct { - fs afero.Fs - bs *BlobStorage -} +type ( + BlobMocker struct { + fs afero.Fs + bs *BlobStorage + } + + DataColumnMocker struct { + fs afero.Fs + dcs *DataColumnStorage + } +) // CreateFakeIndices creates empty blob sidecar files at the expected path for the given // root and indices to influence the result of Indices(). -func (bm *BlobMocker) CreateFakeIndices(root [32]byte, slot primitives.Slot, indices ...uint64) error { +func (bm *BlobMocker) CreateFakeIndices(root [fieldparams.RootLength]byte, slot primitives.Slot, indices ...uint64) error { for i := range indices { if err := bm.bs.layout.notify(newBlobIdent(root, slots.ToEpoch(slot), indices[i])); err != nil { return err @@ -62,6 +70,17 @@ func (bm *BlobMocker) CreateFakeIndices(root [32]byte, slot primitives.Slot, ind return nil } +// CreateFakeIndices creates empty blob sidecar files at the expected path for the given +// root and indices to influence the result of Indices(). +func (bm *DataColumnMocker) CreateFakeIndices(root [fieldparams.RootLength]byte, slot primitives.Slot, indices ...uint64) error { + err := bm.dcs.cache.set(DataColumnsIdent{Root: root, Epoch: slots.ToEpoch(slot), Indices: indices}) + if err != nil { + return errors.Wrap(err, "cache set") + } + + return nil +} + // NewEphemeralBlobStorageWithMocker returns a *BlobMocker value in addition to the BlobStorage value. // BlockMocker encapsulates things blob path construction to avoid leaking implementation details. func NewEphemeralBlobStorageWithMocker(t testing.TB) (*BlobMocker, *BlobStorage) { @@ -119,6 +138,13 @@ func NewEphemeralDataColumnStorageUsingFs(t testing.TB, fs afero.Fs, opts ...Dat return bs } +// NewEphemeralDataColumnStorageWithMocker returns a *BlobMocker value in addition to the BlobStorage value. +// BlockMocker encapsulates things blob path construction to avoid leaking implementation details. +func NewEphemeralDataColumnStorageWithMocker(t testing.TB) (*DataColumnMocker, *DataColumnStorage) { + fs, dcs := NewEphemeralDataColumnStorageAndFs(t) + return &DataColumnMocker{fs: fs, dcs: dcs}, dcs +} + func NewMockDataColumnStorageSummarizer(t *testing.T, set map[[fieldparams.RootLength]byte][]uint64) DataColumnStorageSummarizer { c := newDataColumnStorageSummaryCache() for root, indices := range set { diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index 2b4d81fb76..640bca6371 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -20,96 +20,106 @@ import ( "google.golang.org/protobuf/proto" ) -// P2P represents the full p2p interface composed of all of the sub-interfaces. -type P2P interface { - Broadcaster - SetStreamHandler - PubSubProvider - PubSubTopicUser - SenderEncoder - PeerManager - ConnectionHandler - PeersProvider - MetadataProvider -} +type ( + // P2P represents the full p2p interface composed of all of the sub-interfaces. + P2P interface { + Broadcaster + SetStreamHandler + PubSubProvider + PubSubTopicUser + SenderEncoder + PeerManager + ConnectionHandler + PeersProvider + MetadataProvider + DataColumnsHandler + } -// Broadcaster broadcasts messages to peers over the p2p pubsub protocol. -type Broadcaster interface { - Broadcast(context.Context, proto.Message) error - BroadcastAttestation(ctx context.Context, subnet uint64, att ethpb.Att) error - BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error - BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error - BroadcastLightClientOptimisticUpdate(ctx context.Context, update interfaces.LightClientOptimisticUpdate) error - BroadcastLightClientFinalityUpdate(ctx context.Context, update interfaces.LightClientFinalityUpdate) error - BroadcastDataColumn(root [fieldparams.RootLength]byte, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar, peersChecked ...chan<- bool) error -} + // Accessor provides access to the Broadcaster and PeerManager interfaces. + Accessor interface { + Broadcaster + PeerManager + } -// SetStreamHandler configures p2p to handle streams of a certain topic ID. -type SetStreamHandler interface { - SetStreamHandler(topic string, handler network.StreamHandler) -} + // Broadcaster broadcasts messages to peers over the p2p pubsub protocol. + Broadcaster interface { + Broadcast(context.Context, proto.Message) error + BroadcastAttestation(ctx context.Context, subnet uint64, att ethpb.Att) error + BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error + BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error + BroadcastLightClientOptimisticUpdate(ctx context.Context, update interfaces.LightClientOptimisticUpdate) error + BroadcastLightClientFinalityUpdate(ctx context.Context, update interfaces.LightClientFinalityUpdate) error + BroadcastDataColumn(root [fieldparams.RootLength]byte, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar, peersChecked ...chan<- bool) error + } -// PubSubTopicUser provides way to join, use and leave PubSub topics. -type PubSubTopicUser interface { - JoinTopic(topic string, opts ...pubsub.TopicOpt) (*pubsub.Topic, error) - LeaveTopic(topic string) error - PublishToTopic(ctx context.Context, topic string, data []byte, opts ...pubsub.PubOpt) error - SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub.Subscription, error) -} + // SetStreamHandler configures p2p to handle streams of a certain topic ID. + SetStreamHandler interface { + SetStreamHandler(topic string, handler network.StreamHandler) + } -// ConnectionHandler configures p2p to handle connections with a peer. -type ConnectionHandler interface { - AddConnectionHandler(f func(ctx context.Context, id peer.ID) error, - j func(ctx context.Context, id peer.ID) error) - AddDisconnectionHandler(f func(ctx context.Context, id peer.ID) error) - connmgr.ConnectionGater -} + // PubSubTopicUser provides way to join, use and leave PubSub topics. + PubSubTopicUser interface { + JoinTopic(topic string, opts ...pubsub.TopicOpt) (*pubsub.Topic, error) + LeaveTopic(topic string) error + PublishToTopic(ctx context.Context, topic string, data []byte, opts ...pubsub.PubOpt) error + SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub.Subscription, error) + } -// SenderEncoder allows sending functionality from libp2p as well as encoding for requests and responses. -type SenderEncoder interface { - EncodingProvider - Sender -} + // ConnectionHandler configures p2p to handle connections with a peer. + ConnectionHandler interface { + AddConnectionHandler(f func(ctx context.Context, id peer.ID) error, + j func(ctx context.Context, id peer.ID) error) + AddDisconnectionHandler(f func(ctx context.Context, id peer.ID) error) + connmgr.ConnectionGater + } -// EncodingProvider provides p2p network encoding. -type EncodingProvider interface { - Encoding() encoder.NetworkEncoding -} + // SenderEncoder allows sending functionality from libp2p as well as encoding for requests and responses. + SenderEncoder interface { + EncodingProvider + Sender + } -// PubSubProvider provides the p2p pubsub protocol. -type PubSubProvider interface { - PubSub() *pubsub.PubSub -} + // EncodingProvider provides p2p network encoding. + EncodingProvider interface { + Encoding() encoder.NetworkEncoding + } -// PeerManager abstracts some peer management methods from libp2p. -type PeerManager interface { - Disconnect(peer.ID) error - PeerID() peer.ID - Host() host.Host - ENR() *enr.Record - NodeID() enode.ID - DiscoveryAddresses() ([]multiaddr.Multiaddr, error) - RefreshPersistentSubnets() - FindPeersWithSubnet(ctx context.Context, topic string, subIndex uint64, threshold int) (bool, error) - AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) -} + // PubSubProvider provides the p2p pubsub protocol. + PubSubProvider interface { + PubSub() *pubsub.PubSub + } -// Sender abstracts the sending functionality from libp2p. -type Sender interface { - Send(context.Context, interface{}, string, peer.ID) (network.Stream, error) -} + // PeerManager abstracts some peer management methods from libp2p. + PeerManager interface { + Disconnect(peer.ID) error + PeerID() peer.ID + Host() host.Host + ENR() *enr.Record + NodeID() enode.ID + DiscoveryAddresses() ([]multiaddr.Multiaddr, error) + RefreshPersistentSubnets() + FindPeersWithSubnet(ctx context.Context, topic string, subIndex uint64, threshold int) (bool, error) + AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) + } -// PeersProvider abstracts obtaining our current list of known peers status. -type PeersProvider interface { - Peers() *peers.Status -} + // Sender abstracts the sending functionality from libp2p. + Sender interface { + Send(context.Context, interface{}, string, peer.ID) (network.Stream, error) + } -// MetadataProvider returns the metadata related information for the local peer. -type MetadataProvider interface { - Metadata() metadata.Metadata - MetadataSeq() uint64 -} + // PeersProvider abstracts obtaining our current list of known peers status. + PeersProvider interface { + Peers() *peers.Status + } -type DataColumnsHandler interface { - CustodyGroupCountFromPeer(peer.ID) uint64 -} + // MetadataProvider returns the metadata related information for the local peer. + MetadataProvider interface { + Metadata() metadata.Metadata + MetadataSeq() uint64 + } + + // DataColumnsHandler abstracts some data columns related methods. + DataColumnsHandler interface { + CustodyGroupCountFromPeer(peer.ID) uint64 + } +) diff --git a/changelog/manu-peerdas-blockchain.md b/changelog/manu-peerdas-blockchain.md new file mode 100644 index 0000000000..3d3a9b929e --- /dev/null +++ b/changelog/manu-peerdas-blockchain.md @@ -0,0 +1,2 @@ +### Added +- PeerDAS: Implement the blockchain package. diff --git a/testing/spectest/shared/common/forkchoice/service.go b/testing/spectest/shared/common/forkchoice/service.go index cb8a1ba925..a0649f1cdd 100644 --- a/testing/spectest/shared/common/forkchoice/service.go +++ b/testing/spectest/shared/common/forkchoice/service.go @@ -76,6 +76,7 @@ func startChainService(t testing.TB, blockchain.WithPayloadIDCache(cache.NewPayloadIDCache()), blockchain.WithClockSynchronizer(clockSync), blockchain.WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)), + blockchain.WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)), blockchain.WithSyncChecker(mock.MockChecker{}), ) service, err := blockchain.NewService(context.Background(), opts...) diff --git a/testing/util/BUILD.bazel b/testing/util/BUILD.bazel index 2060ebf443..a5da3dbbc2 100644 --- a/testing/util/BUILD.bazel +++ b/testing/util/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "electra_block.go", "electra_state.go", "fulu.go", + "fulu_block.go", "helpers.go", "lightclient.go", "logging.go", diff --git a/testing/util/block.go b/testing/util/block.go index c65e2db7c1..cbc5245e94 100644 --- a/testing/util/block.go +++ b/testing/util/block.go @@ -44,6 +44,7 @@ type BlockGenConfig struct { NumDepositRequests uint64 // Only for post Electra blocks NumWithdrawalRequests uint64 // Only for post Electra blocks NumConsolidationRequests uint64 // Only for post Electra blocks + NumBlobKzgCommitments uint64 // Only for post Deneb blocks } // DefaultBlockGenConfig returns the block config that utilizes the @@ -61,6 +62,7 @@ func DefaultBlockGenConfig() *BlockGenConfig { NumConsolidationRequests: 0, NumWithdrawalRequests: 0, NumDepositRequests: 0, + NumBlobKzgCommitments: 0, } } diff --git a/testing/util/fulu_block.go b/testing/util/fulu_block.go new file mode 100644 index 0000000000..93d2308611 --- /dev/null +++ b/testing/util/fulu_block.go @@ -0,0 +1,262 @@ +package util + +import ( + "context" + "fmt" + + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/time" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/transition" + "github.com/OffchainLabs/prysm/v6/beacon-chain/state" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" + "github.com/OffchainLabs/prysm/v6/crypto/bls" + "github.com/OffchainLabs/prysm/v6/encoding/bytesutil" + v1 "github.com/OffchainLabs/prysm/v6/proto/engine/v1" + ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + "github.com/OffchainLabs/prysm/v6/time/slots" + "github.com/pkg/errors" + "github.com/prysmaticlabs/go-bitfield" +) + +// GenerateFullBlockFulu generates a fully valid Fulu block with the requested parameters. +// Use BlockGenConfig to declare the conditions you would like the block generated under. +// This function modifies the passed state as follows: +func GenerateFullBlockFulu(bState state.BeaconState, privs []bls.SecretKey, conf *BlockGenConfig, slot primitives.Slot) (*ethpb.SignedBeaconBlockFulu, error) { + ctx := context.Background() + currentSlot := bState.Slot() + if currentSlot > slot { + return nil, fmt.Errorf("current slot in state is larger than given slot. %d > %d", currentSlot, slot) + } + bState = bState.Copy() + + if conf == nil { + conf = &BlockGenConfig{} + } + + var err error + var pSlashings []*ethpb.ProposerSlashing + numToGen := conf.NumProposerSlashings + if numToGen > 0 { + pSlashings, err = generateProposerSlashings(bState, privs, numToGen) + if err != nil { + return nil, errors.Wrapf(err, "failed generating %d proposer slashings:", numToGen) + } + } + + numToGen = conf.NumAttesterSlashings + var aSlashings []*ethpb.AttesterSlashingElectra + if numToGen > 0 { + generated, err := generateAttesterSlashings(bState, privs, numToGen) + if err != nil { + return nil, errors.Wrapf(err, "failed generating %d attester slashings:", numToGen) + } + aSlashings = make([]*ethpb.AttesterSlashingElectra, len(generated)) + var ok bool + for i, s := range generated { + aSlashings[i], ok = s.(*ethpb.AttesterSlashingElectra) + if !ok { + return nil, fmt.Errorf("attester slashing has the wrong type (expected %T, got %T)", ðpb.AttesterSlashingElectra{}, s) + } + } + } + + numToGen = conf.NumAttestations + var atts []*ethpb.AttestationElectra + if numToGen > 0 { + generatedAtts, err := GenerateAttestations(bState, privs, numToGen, slot, false) + if err != nil { + return nil, errors.Wrapf(err, "failed generating %d attestations:", numToGen) + } + atts = make([]*ethpb.AttestationElectra, len(generatedAtts)) + var ok bool + for i, a := range generatedAtts { + atts[i], ok = a.(*ethpb.AttestationElectra) + if !ok { + return nil, fmt.Errorf("attestation has the wrong type (expected %T, got %T)", ðpb.AttestationElectra{}, a) + } + } + } + + numToGen = conf.NumDeposits + var newDeposits []*ethpb.Deposit + eth1Data := bState.Eth1Data() + if numToGen > 0 { + newDeposits, eth1Data, err = generateDepositsAndEth1Data(bState, numToGen) + if err != nil { + return nil, errors.Wrapf(err, "failed generating %d deposits:", numToGen) + } + } + + numToGen = conf.NumVoluntaryExits + var exits []*ethpb.SignedVoluntaryExit + if numToGen > 0 { + exits, err = generateVoluntaryExits(bState, privs, numToGen) + if err != nil { + return nil, errors.Wrapf(err, "failed generating %d attester slashings:", numToGen) + } + } + + numToGen = conf.NumTransactions + newTransactions := make([][]byte, numToGen) + for i := uint64(0); i < numToGen; i++ { + newTransactions[i] = bytesutil.Uint64ToBytesLittleEndian(i) + } + + random, err := helpers.RandaoMix(bState, time.CurrentEpoch(bState)) + if err != nil { + return nil, errors.Wrap(err, "could not process randao mix") + } + + timestamp, err := slots.ToTime(bState.GenesisTime(), slot) + if err != nil { + return nil, errors.Wrap(err, "could not get current timestamp") + } + + stCopy := bState.Copy() + stCopy, err = transition.ProcessSlots(context.Background(), stCopy, slot) + if err != nil { + return nil, err + } + + newWithdrawals := make([]*v1.Withdrawal, 0) + if conf.NumWithdrawals > 0 { + newWithdrawals, err = generateWithdrawals(bState, privs, numToGen) + if err != nil { + return nil, errors.Wrapf(err, "failed generating %d withdrawals:", numToGen) + } + } + + depositRequests := make([]*v1.DepositRequest, 0) + if conf.NumDepositRequests > 0 { + depositRequests, err = generateDepositRequests(bState, privs, conf.NumDepositRequests) + if err != nil { + return nil, errors.Wrapf(err, "failed generating %d deposit requests:", conf.NumDepositRequests) + } + } + + withdrawalRequests := make([]*v1.WithdrawalRequest, 0) + if conf.NumWithdrawalRequests > 0 { + withdrawalRequests, err = generateWithdrawalRequests(bState, privs, conf.NumWithdrawalRequests) + if err != nil { + return nil, errors.Wrapf(err, "failed generating %d withdrawal requests:", conf.NumWithdrawalRequests) + } + } + + consolidationRequests := make([]*v1.ConsolidationRequest, 0) + if conf.NumConsolidationRequests > 0 { + consolidationRequests, err = generateConsolidationRequests(bState, privs, conf.NumConsolidationRequests) + if err != nil { + return nil, errors.Wrapf(err, "failed generating %d consolidation requests:", conf.NumConsolidationRequests) + } + } + + executionRequests := &v1.ExecutionRequests{ + Withdrawals: withdrawalRequests, + Deposits: depositRequests, + Consolidations: consolidationRequests, + } + + parentExecution, err := stCopy.LatestExecutionPayloadHeader() + if err != nil { + return nil, err + } + blockHash := indexToHash(uint64(slot)) + newExecutionPayloadElectra := &v1.ExecutionPayloadDeneb{ + ParentHash: parentExecution.BlockHash(), + FeeRecipient: make([]byte, 20), + StateRoot: params.BeaconConfig().ZeroHash[:], + ReceiptsRoot: params.BeaconConfig().ZeroHash[:], + LogsBloom: make([]byte, 256), + PrevRandao: random, + BlockNumber: uint64(slot), + ExtraData: params.BeaconConfig().ZeroHash[:], + BaseFeePerGas: params.BeaconConfig().ZeroHash[:], + BlockHash: blockHash[:], + Timestamp: uint64(timestamp.Unix()), + Transactions: newTransactions, + Withdrawals: newWithdrawals, + } + var syncCommitteeBits []byte + currSize := new(ethpb.SyncAggregate).SyncCommitteeBits.Len() + switch currSize { + case 512: + syncCommitteeBits = bitfield.NewBitvector512() + case 32: + syncCommitteeBits = bitfield.NewBitvector32() + default: + return nil, errors.New("invalid bit vector size") + } + newSyncAggregate := ðpb.SyncAggregate{ + SyncCommitteeBits: syncCommitteeBits, + SyncCommitteeSignature: append([]byte{0xC0}, make([]byte, 95)...), + } + + newHeader := bState.LatestBlockHeader() + prevStateRoot, err := bState.HashTreeRoot(ctx) + if err != nil { + return nil, errors.Wrap(err, "could not hash state") + } + newHeader.StateRoot = prevStateRoot[:] + parentRoot, err := newHeader.HashTreeRoot() + if err != nil { + return nil, errors.Wrap(err, "could not hash the new header") + } + + if slot == currentSlot { + slot = currentSlot + 1 + } + + reveal, err := RandaoReveal(stCopy, time.CurrentEpoch(stCopy), privs) + if err != nil { + return nil, errors.Wrap(err, "could not compute randao reveal") + } + + idx, err := helpers.BeaconProposerIndex(ctx, stCopy) + if err != nil { + return nil, errors.Wrap(err, "could not compute beacon proposer index") + } + + changes := make([]*ethpb.SignedBLSToExecutionChange, conf.NumBLSChanges) + for i := uint64(0); i < conf.NumBLSChanges; i++ { + changes[i], err = GenerateBLSToExecutionChange(bState, privs[i+1], primitives.ValidatorIndex(i)) + if err != nil { + return nil, err + } + } + + blobKzgCommitments := make([][]byte, 0, conf.NumBlobKzgCommitments) + for range conf.NumBlobKzgCommitments { + blobKzgCommitments = append(blobKzgCommitments, make([]byte, 48)) + } + + block := ðpb.BeaconBlockElectra{ + Slot: slot, + ParentRoot: parentRoot[:], + ProposerIndex: idx, + Body: ðpb.BeaconBlockBodyElectra{ + Eth1Data: eth1Data, + RandaoReveal: reveal, + ProposerSlashings: pSlashings, + AttesterSlashings: aSlashings, + Attestations: atts, + VoluntaryExits: exits, + Deposits: newDeposits, + Graffiti: make([]byte, fieldparams.RootLength), + SyncAggregate: newSyncAggregate, + ExecutionPayload: newExecutionPayloadElectra, + BlsToExecutionChanges: changes, + ExecutionRequests: executionRequests, + BlobKzgCommitments: blobKzgCommitments, + }, + } + + // The fork can change after processing the state + signature, err := BlockSignature(bState, block, privs) + if err != nil { + return nil, errors.Wrap(err, "could not compute block signature") + } + + return ðpb.SignedBeaconBlockFulu{Block: block, Signature: signature.Marshal()}, nil +}