mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-08 23:18:15 -05:00
PeerDAS: Implement blockchain. (#15350)
* PeerDAS: Implement blockchain. * Fix forkchoice spectest panic. * Update beacon-chain/blockchain/error.go Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com> * Fix Preston's comment. * Fix Preston's comment. * Fix Preston's comment. * Fix Preston's comment. * Fix Preston's comment. * Fix Preston's comment. --------- Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
This commit is contained in:
@@ -3,10 +3,12 @@ package blockchain
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
|
||||
coreTime "github.com/OffchainLabs/prysm/v6/beacon-chain/core/time"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/transition"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/das"
|
||||
@@ -239,8 +241,9 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), b); err != nil {
|
||||
return errors.Wrapf(err, "could not validate blob data availability at slot %d", b.Block().Slot())
|
||||
return errors.Wrapf(err, "could not validate sidecar availability at slot %d", b.Block().Slot())
|
||||
}
|
||||
args := &forkchoicetypes.BlockAndCheckpoints{Block: b,
|
||||
JustifiedCheckpoint: jCheckpoints[i],
|
||||
@@ -578,12 +581,12 @@ func (s *Service) runLateBlockTasks() {
|
||||
}
|
||||
}
|
||||
|
||||
// missingIndices uses the expected commitments from the block to determine
|
||||
// missingBlobIndices uses the expected commitments from the block to determine
|
||||
// which BlobSidecar indices would need to be in the database for DA success.
|
||||
// It returns a map where each key represents a missing BlobSidecar index.
|
||||
// An empty map means we have all indices; a non-empty map can be used to compare incoming
|
||||
// BlobSidecars against the set of known missing sidecars.
|
||||
func missingIndices(bs *filesystem.BlobStorage, root [32]byte, expected [][]byte, slot primitives.Slot) (map[uint64]struct{}, error) {
|
||||
func missingBlobIndices(bs *filesystem.BlobStorage, root [fieldparams.RootLength]byte, expected [][]byte, slot primitives.Slot) (map[uint64]bool, error) {
|
||||
maxBlobsPerBlock := params.BeaconConfig().MaxBlobsPerBlock(slot)
|
||||
if len(expected) == 0 {
|
||||
return nil, nil
|
||||
@@ -592,29 +595,223 @@ func missingIndices(bs *filesystem.BlobStorage, root [32]byte, expected [][]byte
|
||||
return nil, errMaxBlobsExceeded
|
||||
}
|
||||
indices := bs.Summary(root)
|
||||
missing := make(map[uint64]struct{}, len(expected))
|
||||
missing := make(map[uint64]bool, len(expected))
|
||||
for i := range expected {
|
||||
if len(expected[i]) > 0 && !indices.HasIndex(uint64(i)) {
|
||||
missing[uint64(i)] = struct{}{}
|
||||
missing[uint64(i)] = true
|
||||
}
|
||||
}
|
||||
return missing, nil
|
||||
}
|
||||
|
||||
// isDataAvailable blocks until all BlobSidecars committed to in the block are available,
|
||||
// or an error or context cancellation occurs. A nil result means that the data availability check is successful.
|
||||
// The function will first check the database to see if all sidecars have been persisted. If any
|
||||
// sidecars are missing, it will then read from the blobNotifier channel for the given root until the channel is
|
||||
// closed, the context hits cancellation/timeout, or notifications have been received for all the missing sidecars.
|
||||
func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error {
|
||||
if signed.Version() < version.Deneb {
|
||||
return nil
|
||||
// missingDataColumnIndices uses the expected data columns from the block to determine
|
||||
// which DataColumnSidecar indices would need to be in the database for DA success.
|
||||
// It returns a map where each key represents a missing DataColumnSidecar index.
|
||||
// An empty map means we have all indices; a non-empty map can be used to compare incoming
|
||||
// DataColumns against the set of known missing sidecars.
|
||||
func missingDataColumnIndices(bs *filesystem.DataColumnStorage, root [fieldparams.RootLength]byte, expected map[uint64]bool) (map[uint64]bool, error) {
|
||||
if len(expected) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
block := signed.Block()
|
||||
numberOfColumns := params.BeaconConfig().NumberOfColumns
|
||||
|
||||
if uint64(len(expected)) > numberOfColumns {
|
||||
return nil, errMaxDataColumnsExceeded
|
||||
}
|
||||
|
||||
// Get a summary of the data columns stored in the database.
|
||||
summary := bs.Summary(root)
|
||||
|
||||
// Check all expected data columns against the summary.
|
||||
missing := make(map[uint64]bool)
|
||||
for column := range expected {
|
||||
if !summary.HasIndex(column) {
|
||||
missing[column] = true
|
||||
}
|
||||
}
|
||||
|
||||
return missing, nil
|
||||
}
|
||||
|
||||
// isDataAvailable blocks until all sidecars committed to in the block are available,
|
||||
// or an error or context cancellation occurs. A nil result means that the data availability check is successful.
|
||||
// The function will first check the database to see if all sidecars have been persisted. If any
|
||||
// sidecars are missing, it will then read from the sidecar notifier channel for the given root until the channel is
|
||||
// closed, the context hits cancellation/timeout, or notifications have been received for all the missing sidecars.
|
||||
func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signedBlock interfaces.ReadOnlySignedBeaconBlock) error {
|
||||
block := signedBlock.Block()
|
||||
if block == nil {
|
||||
return errors.New("invalid nil beacon block")
|
||||
}
|
||||
|
||||
blockVersion := block.Version()
|
||||
if blockVersion >= version.Fulu {
|
||||
return s.areDataColumnsAvailable(ctx, root, block)
|
||||
}
|
||||
|
||||
if blockVersion >= version.Deneb {
|
||||
return s.areBlobsAvailable(ctx, root, block)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// areDataColumnsAvailable blocks until all data columns committed to in the block are available,
|
||||
// or an error or context cancellation occurs. A nil result means that the data availability check is successful.
|
||||
func (s *Service) areDataColumnsAvailable(ctx context.Context, root [fieldparams.RootLength]byte, block interfaces.ReadOnlyBeaconBlock) error {
|
||||
// We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
|
||||
blockSlot, currentSlot := block.Slot(), s.CurrentSlot()
|
||||
blockEpoch, currentEpoch := slots.ToEpoch(blockSlot), slots.ToEpoch(currentSlot)
|
||||
|
||||
if !params.WithinDAPeriod(blockEpoch, currentEpoch) {
|
||||
return nil
|
||||
}
|
||||
|
||||
body := block.Body()
|
||||
if body == nil {
|
||||
return errors.New("invalid nil beacon block body")
|
||||
}
|
||||
|
||||
kzgCommitments, err := body.BlobKzgCommitments()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "blob KZG commitments")
|
||||
}
|
||||
|
||||
// If block has not commitments there is nothing to wait for.
|
||||
if len(kzgCommitments) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// All columns to sample need to be available for the block to be considered available.
|
||||
// https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/das-core.md#custody-sampling
|
||||
nodeID := s.cfg.P2P.NodeID()
|
||||
|
||||
// Prevent custody group count to change during the rest of the function.
|
||||
s.cfg.CustodyInfo.Mut.RLock()
|
||||
defer s.cfg.CustodyInfo.Mut.RUnlock()
|
||||
|
||||
// Get the custody group sampling size for the node.
|
||||
custodyGroupSamplingSize := s.cfg.CustodyInfo.CustodyGroupSamplingSize(peerdas.Actual)
|
||||
peerInfo, _, err := peerdas.Info(nodeID, custodyGroupSamplingSize)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "peer info")
|
||||
}
|
||||
|
||||
// Subscribe to newly data columns stored in the database.
|
||||
subscription, identsChan := s.dataColumnStorage.Subscribe()
|
||||
defer subscription.Unsubscribe()
|
||||
|
||||
// Get the count of data columns we already have in the store.
|
||||
summary := s.dataColumnStorage.Summary(root)
|
||||
storedDataColumnsCount := summary.Count()
|
||||
|
||||
minimumColumnCountToReconstruct := peerdas.MinimumColumnsCountToReconstruct()
|
||||
|
||||
// As soon as we have enough data column sidecars, we can reconstruct the missing ones.
|
||||
// We don't need to wait for the rest of the data columns to declare the block as available.
|
||||
if storedDataColumnsCount >= minimumColumnCountToReconstruct {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get a map of data column indices that are not currently available.
|
||||
missingMap, err := missingDataColumnIndices(s.dataColumnStorage, root, peerInfo.CustodyColumns)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "missing data columns")
|
||||
}
|
||||
|
||||
// If there are no missing indices, all data column sidecars are available.
|
||||
// This is the happy path.
|
||||
if len(missingMap) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Log for DA checks that cross over into the next slot; helpful for debugging.
|
||||
nextSlot := slots.BeginsAt(block.Slot()+1, s.genesisTime)
|
||||
|
||||
// Avoid logging if DA check is called after next slot start.
|
||||
if nextSlot.After(time.Now()) {
|
||||
timer := time.AfterFunc(time.Until(nextSlot), func() {
|
||||
missingMapCount := uint64(len(missingMap))
|
||||
|
||||
if missingMapCount == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
expected interface{} = "all"
|
||||
missing interface{} = "all"
|
||||
)
|
||||
|
||||
numberOfColumns := params.BeaconConfig().NumberOfColumns
|
||||
colMapCount := uint64(len(peerInfo.CustodyColumns))
|
||||
|
||||
if colMapCount < numberOfColumns {
|
||||
expected = uint64MapToSortedSlice(peerInfo.CustodyColumns)
|
||||
}
|
||||
|
||||
if missingMapCount < numberOfColumns {
|
||||
missing = uint64MapToSortedSlice(missingMap)
|
||||
}
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"slot": block.Slot(),
|
||||
"root": fmt.Sprintf("%#x", root),
|
||||
"columnsExpected": expected,
|
||||
"columnsWaiting": missing,
|
||||
}).Warning("Data columns still missing at slot end")
|
||||
})
|
||||
defer timer.Stop()
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case idents := <-identsChan:
|
||||
if idents.Root != root {
|
||||
// This is not the root we are looking for.
|
||||
continue
|
||||
}
|
||||
|
||||
for _, index := range idents.Indices {
|
||||
// This is a data column we are expecting.
|
||||
if _, ok := missingMap[index]; ok {
|
||||
storedDataColumnsCount++
|
||||
}
|
||||
|
||||
// As soon as we have more than half of the data columns, we can reconstruct the missing ones.
|
||||
// We don't need to wait for the rest of the data columns to declare the block as available.
|
||||
if storedDataColumnsCount >= minimumColumnCountToReconstruct {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove the index from the missing map.
|
||||
delete(missingMap, index)
|
||||
|
||||
// Return if there is no more missing data columns.
|
||||
if len(missingMap) == 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
var missingIndices interface{} = "all"
|
||||
numberOfColumns := params.BeaconConfig().NumberOfColumns
|
||||
missingIndicesCount := uint64(len(missingMap))
|
||||
|
||||
if missingIndicesCount < numberOfColumns {
|
||||
missingIndices = uint64MapToSortedSlice(missingMap)
|
||||
}
|
||||
|
||||
return errors.Wrapf(ctx.Err(), "data column sidecars slot: %d, BlockRoot: %#x, missing %v", block.Slot(), root, missingIndices)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// areBlobsAvailable blocks until all BlobSidecars committed to in the block are available,
|
||||
// or an error or context cancellation occurs. A nil result means that the data availability check is successful.
|
||||
func (s *Service) areBlobsAvailable(ctx context.Context, root [fieldparams.RootLength]byte, block interfaces.ReadOnlyBeaconBlock) error {
|
||||
blockSlot := block.Slot()
|
||||
|
||||
// We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
|
||||
if !params.WithinDAPeriod(slots.ToEpoch(block.Slot()), slots.ToEpoch(s.CurrentSlot())) {
|
||||
return nil
|
||||
@@ -634,9 +831,9 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int
|
||||
return nil
|
||||
}
|
||||
// get a map of BlobSidecar indices that are not currently available.
|
||||
missing, err := missingIndices(s.blobStorage, root, kzgCommitments, block.Slot())
|
||||
missing, err := missingBlobIndices(s.blobStorage, root, kzgCommitments, block.Slot())
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "missing indices")
|
||||
}
|
||||
// If there are no missing indices, all BlobSidecars are available.
|
||||
if len(missing) == 0 {
|
||||
@@ -648,15 +845,20 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int
|
||||
nc := s.blobNotifiers.forRoot(root, block.Slot())
|
||||
|
||||
// Log for DA checks that cross over into the next slot; helpful for debugging.
|
||||
nextSlot := slots.BeginsAt(signed.Block().Slot()+1, s.genesisTime)
|
||||
nextSlot := slots.BeginsAt(block.Slot()+1, s.genesisTime)
|
||||
// Avoid logging if DA check is called after next slot start.
|
||||
if nextSlot.After(time.Now()) {
|
||||
nst := time.AfterFunc(time.Until(nextSlot), func() {
|
||||
if len(missing) == 0 {
|
||||
return
|
||||
}
|
||||
log.WithFields(daCheckLogFields(root, signed.Block().Slot(), expected, len(missing))).
|
||||
Error("Still waiting for DA check at slot end.")
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"slot": blockSlot,
|
||||
"root": fmt.Sprintf("%#x", root),
|
||||
"blobsExpected": expected,
|
||||
"blobsWaiting": len(missing),
|
||||
}).Error("Still waiting for blobs DA check at slot end.")
|
||||
})
|
||||
defer nst.Stop()
|
||||
}
|
||||
@@ -678,13 +880,14 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int
|
||||
}
|
||||
}
|
||||
|
||||
func daCheckLogFields(root [32]byte, slot primitives.Slot, expected, missing int) logrus.Fields {
|
||||
return logrus.Fields{
|
||||
"slot": slot,
|
||||
"root": fmt.Sprintf("%#x", root),
|
||||
"blobsExpected": expected,
|
||||
"blobsWaiting": missing,
|
||||
// uint64MapToSortedSlice produces a sorted uint64 slice from a map.
|
||||
func uint64MapToSortedSlice(input map[uint64]bool) []uint64 {
|
||||
output := make([]uint64, 0, len(input))
|
||||
for idx := range input {
|
||||
output = append(output, idx)
|
||||
}
|
||||
slices.Sort[[]uint64](output)
|
||||
return output
|
||||
}
|
||||
|
||||
// lateBlockTasks is called 4 seconds into the slot and performs tasks
|
||||
@@ -770,7 +973,7 @@ func (s *Service) waitForSync() error {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) handleInvalidExecutionError(ctx context.Context, err error, blockRoot [32]byte, parentRoot [32]byte) error {
|
||||
func (s *Service) handleInvalidExecutionError(ctx context.Context, err error, blockRoot, parentRoot [fieldparams.RootLength]byte) error {
|
||||
if IsInvalidBlock(err) && InvalidBlockLVH(err) != [32]byte{} {
|
||||
return s.pruneInvalidBlock(ctx, blockRoot, parentRoot, InvalidBlockLVH(err))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user